diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/pom.xml b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/pom.xml index 73d3b92573e..d2961735336 100644 --- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/pom.xml +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/pom.xml @@ -37,6 +37,12 @@ ${project.parent.version} + + org.apache.dubbo + dubbo-remoting-http3 + ${project.parent.version} + + org.apache.dubbo dubbo-triple-servlet diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java index 4030a361eaf..9b714b7a186 100644 --- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/java/org/apache/dubbo/xds/demo/consumer/XdsConsumerApplication.java @@ -29,7 +29,7 @@ @Service @EnableDubbo public class XdsConsumerApplication { - @DubboReference(providedBy = "dubbo-demo-xds-provider") + @DubboReference(providedBy = "echo:7070") private DemoService demoService; public static void main(String[] args) throws InterruptedException { diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml index 251a6afdac8..4c2144dc3ab 100644 --- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/application.yml @@ -26,6 +26,6 @@ dubbo: name: tri port: 50050 registry: - address: xds://istiod.istio-system.svc:15010?security=plaintext # istio://istiod.istio-system.svc:15012 + address: xds://47.251.12.148:15010?security=plaintext # istio://istiod.istio-system.svc:15012 diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json new file mode 100644 index 00000000000..54f47100d63 --- /dev/null +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-consumer/src/main/resources/bootstrap.json @@ -0,0 +1,92 @@ +{ + "xds_servers": [ + { + "server_uri": "47.251.12.148:15010", + "channel_creds": [ + { + "type": "insecure" + } + ], + "server_features": [ + "xds_v3" + ] + } + ], + "node": { + "id": "sidecar~192.168.19.141~echo-v1-5764868574-whqs9.echo-grpc~echo-grpc.svc.cluster.local", + "metadata": { + "ANNOTATIONS": { + "inject.istio.io/templates": "grpc-agent", + "istio.io/rev": "default", + "kubectl.kubernetes.io/default-container": "app", + "kubectl.kubernetes.io/default-logs-container": "app", + "kubernetes.io/config.seen": "2024-07-02T17:22:26.354582057+08:00", + "kubernetes.io/config.source": "api", + "prometheus.io/path": "/stats/prometheus", + "prometheus.io/port": "15020", + "prometheus.io/scrape": "true", + "proxy.istio.io/config": "{\"holdApplicationUntilProxyStarts\": true}", + "proxy.istio.io/overrides": "{\"containers\":[{\"name\":\"app\",\"image\":\"gcr.io/istio-testing/app:latest\",\"args\":[\"--metrics=15014\",\"--port\",\"18080\",\"--tcp\",\"19090\",\"--xds-grpc-server=17070\",\"--grpc\",\"17070\",\"--grpc\",\"17171\",\"--port\",\"3333\",\"--port\",\"8080\",\"--version\",\"v1\",\"--crt=/cert.crt\",\"--key=/cert.key\"],\"ports\":[{\"containerPort\":17070,\"protocol\":\"TCP\"},{\"containerPort\":17171,\"protocol\":\"TCP\"},{\"containerPort\":8080,\"protocol\":\"TCP\"},{\"name\":\"tcp-health-port\",\"containerPort\":3333,\"protocol\":\"TCP\"}],\"env\":[{\"name\":\"INSTANCE_IP\",\"valueFrom\":{\"fieldRef\":{\"apiVersion\":\"v1\",\"fieldPath\":\"status.podIP\"}}}],\"resources\":{},\"volumeMounts\":[{\"name\":\"kube-api-access-4qkzb\",\"readOnly\":true,\"mountPath\":\"/var/run/secrets/kubernetes.io/serviceaccount\"}],\"livenessProbe\":{\"tcpSocket\":{\"port\":\"tcp-health-port\"},\"initialDelaySeconds\":10,\"timeoutSeconds\":1,\"periodSeconds\":10,\"successThreshold\":1,\"failureThreshold\":10},\"readinessProbe\":{\"httpGet\":{\"path\":\"/\",\"port\":8080,\"scheme\":\"HTTP\"},\"initialDelaySeconds\":1,\"timeoutSeconds\":1,\"periodSeconds\":2,\"successThreshold\":1,\"failureThreshold\":10},\"startupProbe\":{\"tcpSocket\":{\"port\":\"tcp-health-port\"},\"timeoutSeconds\":1,\"periodSeconds\":10,\"successThreshold\":1,\"failureThreshold\":10},\"terminationMessagePath\":\"/dev/termination-log\",\"terminationMessagePolicy\":\"File\",\"imagePullPolicy\":\"Always\"}]}", + "sidecar.istio.io/rewriteAppHTTPProbers": "false", + "sidecar.istio.io/status": "{\"initContainers\":null,\"containers\":[\"istio-proxy\",\"app\"],\"volumes\":[\"workload-socket\",\"workload-certs\",\"istio-xds\",\"istio-data\",\"istio-podinfo\",\"istiod-ca-cert\"],\"imagePullSecrets\":null,\"revision\":\"default\"}" + }, + "APP_CONTAINERS": "app", + "CLUSTER_ID": "Kubernetes", + "ENVOY_PROMETHEUS_PORT": 15090, + "ENVOY_STATUS_PORT": 15021, + "GENERATOR": "grpc", + "INSTANCE_IPS": "192.168.19.141", + "ISTIO_PROXY_SHA": "7b292c7175692c822148b64005a731eb00365508", + "ISTIO_VERSION": "1.20.2", + "LABELS": { + "app": "echo", + "service.istio.io/canonical-name": "echo", + "service.istio.io/canonical-revision": "v1", + "version": "v1" + }, + "MESH_ID": "cluster.local", + "NAME": "echo-v1-5859d7bc7d-wlb2d", + "NAMESPACE": "echo-grpc", + "NODE_NAME": "us-west-1.192.168.19.107", + "OWNER": "kubernetes://apis/apps/v1/namespaces/echo-grpc/deployments/echo-v1", + "PILOT_SAN": [ + "istiod.istio-system.svc" + ], + "POD_PORTS": "[{\"containerPort\":17070,\"protocol\":\"TCP\"},{\"containerPort\":17171,\"protocol\":\"TCP\"},{\"containerPort\":8080,\"protocol\":\"TCP\"},{\"name\":\"tcp-health-port\",\"containerPort\":3333,\"protocol\":\"TCP\"}]", + "PROXY_CONFIG": { + "binaryPath": "/usr/local/bin/envoy", + "configPath": "./etc/istio/proxy", + "controlPlaneAuthPolicy": "MUTUAL_TLS", + "discoveryAddress": "istiod.istio-system.svc:15012", + "drainDuration": "45s", + "holdApplicationUntilProxyStarts": true, + "proxyAdminPort": 15000, + "serviceCluster": "istio-proxy", + "statNameLength": 189, + "statusPort": 15020, + "terminationDrainDuration": "5s", + "tracing": { + "zipkin": { + "address": "zipkin.istio-system:9411" + } + } + }, + "SERVICE_ACCOUNT": "default", + "WORKLOAD_NAME": "echo-v1" + }, + "locality": {}, + "UserAgentVersionType": null + }, + "certificate_providers": { + "default": { + "plugin_name": "file_watcher", + "config": { + "certificate_file": "/var/lib/istio/data/cert-chain.pem", + "private_key_file": "/var/lib/istio/data/key.pem", + "ca_certificate_file": "/var/lib/istio/data/root-cert.pem", + "refresh_interval": "900s" + } + } + }, + "server_listener_resource_name_template": "xds.istio.io/grpc/lds/inbound/%s" +} \ No newline at end of file diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/application.yml b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/application.yml index 4d9b4bd788e..78ef823a606 100644 --- a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/application.yml +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/application.yml @@ -26,4 +26,4 @@ dubbo: name: tri port: 50051 registry: - address: xds://istiod.istio-system.svc:15010?security=plaintext # istio://istiod.istio-system.svc:15012 + address: xds://47.251.12.148:15010?security=plaintext # istio://istiod.istio-system.svc:15012 diff --git a/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/bootstrap.json b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/bootstrap.json new file mode 100644 index 00000000000..54f47100d63 --- /dev/null +++ b/dubbo-demo/dubbo-demo-xds/dubbo-demo-xds-provider/src/main/resources/bootstrap.json @@ -0,0 +1,92 @@ +{ + "xds_servers": [ + { + "server_uri": "47.251.12.148:15010", + "channel_creds": [ + { + "type": "insecure" + } + ], + "server_features": [ + "xds_v3" + ] + } + ], + "node": { + "id": "sidecar~192.168.19.141~echo-v1-5764868574-whqs9.echo-grpc~echo-grpc.svc.cluster.local", + "metadata": { + "ANNOTATIONS": { + "inject.istio.io/templates": "grpc-agent", + "istio.io/rev": "default", + "kubectl.kubernetes.io/default-container": "app", + "kubectl.kubernetes.io/default-logs-container": "app", + "kubernetes.io/config.seen": "2024-07-02T17:22:26.354582057+08:00", + "kubernetes.io/config.source": "api", + "prometheus.io/path": "/stats/prometheus", + "prometheus.io/port": "15020", + "prometheus.io/scrape": "true", + "proxy.istio.io/config": "{\"holdApplicationUntilProxyStarts\": true}", + "proxy.istio.io/overrides": "{\"containers\":[{\"name\":\"app\",\"image\":\"gcr.io/istio-testing/app:latest\",\"args\":[\"--metrics=15014\",\"--port\",\"18080\",\"--tcp\",\"19090\",\"--xds-grpc-server=17070\",\"--grpc\",\"17070\",\"--grpc\",\"17171\",\"--port\",\"3333\",\"--port\",\"8080\",\"--version\",\"v1\",\"--crt=/cert.crt\",\"--key=/cert.key\"],\"ports\":[{\"containerPort\":17070,\"protocol\":\"TCP\"},{\"containerPort\":17171,\"protocol\":\"TCP\"},{\"containerPort\":8080,\"protocol\":\"TCP\"},{\"name\":\"tcp-health-port\",\"containerPort\":3333,\"protocol\":\"TCP\"}],\"env\":[{\"name\":\"INSTANCE_IP\",\"valueFrom\":{\"fieldRef\":{\"apiVersion\":\"v1\",\"fieldPath\":\"status.podIP\"}}}],\"resources\":{},\"volumeMounts\":[{\"name\":\"kube-api-access-4qkzb\",\"readOnly\":true,\"mountPath\":\"/var/run/secrets/kubernetes.io/serviceaccount\"}],\"livenessProbe\":{\"tcpSocket\":{\"port\":\"tcp-health-port\"},\"initialDelaySeconds\":10,\"timeoutSeconds\":1,\"periodSeconds\":10,\"successThreshold\":1,\"failureThreshold\":10},\"readinessProbe\":{\"httpGet\":{\"path\":\"/\",\"port\":8080,\"scheme\":\"HTTP\"},\"initialDelaySeconds\":1,\"timeoutSeconds\":1,\"periodSeconds\":2,\"successThreshold\":1,\"failureThreshold\":10},\"startupProbe\":{\"tcpSocket\":{\"port\":\"tcp-health-port\"},\"timeoutSeconds\":1,\"periodSeconds\":10,\"successThreshold\":1,\"failureThreshold\":10},\"terminationMessagePath\":\"/dev/termination-log\",\"terminationMessagePolicy\":\"File\",\"imagePullPolicy\":\"Always\"}]}", + "sidecar.istio.io/rewriteAppHTTPProbers": "false", + "sidecar.istio.io/status": "{\"initContainers\":null,\"containers\":[\"istio-proxy\",\"app\"],\"volumes\":[\"workload-socket\",\"workload-certs\",\"istio-xds\",\"istio-data\",\"istio-podinfo\",\"istiod-ca-cert\"],\"imagePullSecrets\":null,\"revision\":\"default\"}" + }, + "APP_CONTAINERS": "app", + "CLUSTER_ID": "Kubernetes", + "ENVOY_PROMETHEUS_PORT": 15090, + "ENVOY_STATUS_PORT": 15021, + "GENERATOR": "grpc", + "INSTANCE_IPS": "192.168.19.141", + "ISTIO_PROXY_SHA": "7b292c7175692c822148b64005a731eb00365508", + "ISTIO_VERSION": "1.20.2", + "LABELS": { + "app": "echo", + "service.istio.io/canonical-name": "echo", + "service.istio.io/canonical-revision": "v1", + "version": "v1" + }, + "MESH_ID": "cluster.local", + "NAME": "echo-v1-5859d7bc7d-wlb2d", + "NAMESPACE": "echo-grpc", + "NODE_NAME": "us-west-1.192.168.19.107", + "OWNER": "kubernetes://apis/apps/v1/namespaces/echo-grpc/deployments/echo-v1", + "PILOT_SAN": [ + "istiod.istio-system.svc" + ], + "POD_PORTS": "[{\"containerPort\":17070,\"protocol\":\"TCP\"},{\"containerPort\":17171,\"protocol\":\"TCP\"},{\"containerPort\":8080,\"protocol\":\"TCP\"},{\"name\":\"tcp-health-port\",\"containerPort\":3333,\"protocol\":\"TCP\"}]", + "PROXY_CONFIG": { + "binaryPath": "/usr/local/bin/envoy", + "configPath": "./etc/istio/proxy", + "controlPlaneAuthPolicy": "MUTUAL_TLS", + "discoveryAddress": "istiod.istio-system.svc:15012", + "drainDuration": "45s", + "holdApplicationUntilProxyStarts": true, + "proxyAdminPort": 15000, + "serviceCluster": "istio-proxy", + "statNameLength": 189, + "statusPort": 15020, + "terminationDrainDuration": "5s", + "tracing": { + "zipkin": { + "address": "zipkin.istio-system:9411" + } + } + }, + "SERVICE_ACCOUNT": "default", + "WORKLOAD_NAME": "echo-v1" + }, + "locality": {}, + "UserAgentVersionType": null + }, + "certificate_providers": { + "default": { + "plugin_name": "file_watcher", + "config": { + "certificate_file": "/var/lib/istio/data/cert-chain.pem", + "private_key_file": "/var/lib/istio/data/key.pem", + "ca_certificate_file": "/var/lib/istio/data/root-cert.pem", + "refresh_interval": "900s" + } + } + }, + "server_listener_resource_name_template": "xds.istio.io/grpc/lds/inbound/%s" +} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/Http3SslContexts.java b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/Http3SslContexts.java index 950fbcad7db..f8ad728b886 100644 --- a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/Http3SslContexts.java +++ b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/Http3SslContexts.java @@ -65,7 +65,7 @@ public static QuicSslContext buildServerSslContext(URL url) { toX509Certificates(keyCertChainIn)); try (InputStream trustCertIn = cert.getTrustCertInputStream()) { if (trustCertIn != null) { - ClientAuth clientAuth = cert.getAuthPolicy() == AuthPolicy.CLIENT_AUTH + ClientAuth clientAuth = cert.getAuthPolicy() == AuthPolicy.CLIENT_AUTH_STRICT ? ClientAuth.REQUIRE : ClientAuth.OPTIONAL; builder.trustManager(toX509Certificates(trustCertIn)).clientAuth(clientAuth); diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java index ac20bb2ddc8..f4758e78df4 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/AdsObserver.java @@ -21,21 +21,30 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.protocol.AbstractProtocol; +import org.apache.dubbo.xds.resource.XdsResourceType; +import org.apache.dubbo.xds.resource.update.ResourceUpdate; +import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import io.envoyproxy.envoy.config.core.v3.Node; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; import io.grpc.stub.StreamObserver; +import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS; import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_REQUEST_XDS; public class AdsObserver { @@ -45,13 +54,14 @@ public class AdsObserver { private final Node node; private volatile XdsChannel xdsChannel; - private final Map listeners = new ConcurrentHashMap<>(); + private final Map, ConcurrentMap> rawResourceListeners = + new ConcurrentHashMap<>(); protected StreamObserver requestObserver; - private CompletableFuture future = new CompletableFuture<>(); + private final CompletableFuture future = new CompletableFuture<>(); - private final Map observedResources = new ConcurrentHashMap<>(); + private final Map> subscribedResourceTypeUrls = new HashMap<>(); public AdsObserver(URL url, Node node) { this.url = url; @@ -60,8 +70,68 @@ public AdsObserver(URL url, Node node) { this.applicationModel = url.getOrDefaultApplicationModel(); } - public void addListener(AbstractProtocol protocol) { - listeners.put(protocol.getTypeUrl(), protocol); + public boolean hasSubscribed(XdsResourceType type) { + return subscribedResourceTypeUrls.containsKey(type.typeUrl()); + } + + public void saveSubscribedType(XdsResourceType type) { + subscribedResourceTypeUrls.put(type.typeUrl(), type); + } + + @SuppressWarnings("unchecked") + public XdsRawResourceProtocol addListener( + String resourceName, XdsResourceType clusterResourceType) { + ConcurrentMap resourceListeners = + rawResourceListeners.computeIfAbsent(clusterResourceType, k -> new ConcurrentHashMap<>()); + return (XdsRawResourceProtocol) resourceListeners.computeIfAbsent( + resourceName, + k -> new XdsRawResourceProtocol<>(this, NodeBuilder.build(), clusterResourceType, applicationModel)); + } + + public void adjustResourceSubscription(XdsResourceType resourceType) { + this.request(buildDiscoveryRequest(resourceType, getResourcesToObserve(resourceType))); + } + + public Set getResourcesToObserve(XdsResourceType resourceType) { + Map listenerMap = + rawResourceListeners.getOrDefault(resourceType, new ConcurrentHashMap<>()); + Set resourceNames = new HashSet<>(); + for (Map.Entry entry : listenerMap.entrySet()) { + resourceNames.add(entry.getKey()); + } + return resourceNames; + } + + private void process( + XdsResourceType resourceTypeInstance, DiscoveryResponse response) { + ValidatedResourceUpdate validatedResourceUpdate = + resourceTypeInstance.parse(XdsResourceType.xdsResourceTypeArgs, response.getResourcesList()); + if (!validatedResourceUpdate.getErrors().isEmpty()) { + logger.error( + REGISTRY_ERROR_PARSING_XDS, + validatedResourceUpdate.getErrors().toArray()); + } + ConcurrentMap parsedResources = validatedResourceUpdate.getParsedResources().entrySet().stream() + .collect(Collectors.toConcurrentMap( + Entry::getKey, e -> e.getValue().getResourceUpdate())); + + Map resourceListenerMap = + rawResourceListeners.getOrDefault(resourceTypeInstance, new ConcurrentHashMap<>()); + for (Map.Entry entry : resourceListenerMap.entrySet()) { + String resourceName = entry.getKey(); + XdsRawResourceListener rawResourceListener = entry.getValue(); + if (parsedResources.containsKey(resourceName)) { + rawResourceListener.onResourceUpdate(parsedResources.get(resourceName)); + } + } + } + + protected DiscoveryRequest buildDiscoveryRequest(XdsResourceType resourceType, Set resourceNames) { + return DiscoveryRequest.newBuilder() + .setNode(node) + .setTypeUrl(resourceType.typeUrl()) + .addAllResourceNames(resourceNames) + .build(); } public void request(DiscoveryRequest discoveryRequest) { @@ -69,7 +139,6 @@ public void request(DiscoveryRequest discoveryRequest) { requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this, future)); } requestObserver.onNext(discoveryRequest); - observedResources.put(discoveryRequest.getTypeUrl(), discoveryRequest); try { // TODO:This is to make the child thread receive the information. // Maybe Using CountDownLatch would be better @@ -87,11 +156,11 @@ public void request(DiscoveryRequest discoveryRequest) { } private static class ResponseObserver implements StreamObserver { - private AdsObserver adsObserver; + private final AdsObserver adsObserver; - private CompletableFuture future; + private final CompletableFuture future; - public ResponseObserver(AdsObserver adsObserver, CompletableFuture future) { + public ResponseObserver(AdsObserver adsObserver, CompletableFuture future) { this.adsObserver = adsObserver; this.future = future; } @@ -102,22 +171,23 @@ public void onNext(DiscoveryResponse discoveryResponse) { if (future != null) { future.complete(null); } - XdsListener xdsListener = adsObserver.listeners.get(discoveryResponse.getTypeUrl()); - xdsListener.process(discoveryResponse); - adsObserver.requestObserver.onNext(buildAck(discoveryResponse)); + + XdsResourceType resourceType = fromTypeUrl(discoveryResponse.getTypeUrl()); + + adsObserver.process(resourceType, discoveryResponse); + + adsObserver.requestObserver.onNext(buildAck(resourceType, discoveryResponse)); } - protected DiscoveryRequest buildAck(DiscoveryResponse response) { + protected DiscoveryRequest buildAck(XdsResourceType resourceType, DiscoveryResponse response) { + // for ACK return DiscoveryRequest.newBuilder() .setNode(adsObserver.node) .setTypeUrl(response.getTypeUrl()) .setVersionInfo(response.getVersionInfo()) .setResponseNonce(response.getNonce()) - .addAllResourceNames(adsObserver - .observedResources - .get(response.getTypeUrl()) - .getResourceNamesList()) + .addAllResourceNames(adsObserver.getResourcesToObserve(resourceType)) .build(); } @@ -132,6 +202,10 @@ public void onCompleted() { logger.info("xDS Client completed"); adsObserver.triggerReConnectTask(); } + + XdsResourceType fromTypeUrl(String typeUrl) { + return adsObserver.subscribedResourceTypeUrls.get(typeUrl); + } } private void triggerReConnectTask() { @@ -149,7 +223,8 @@ private void recover() { if (xdsChannel.getChannel() != null) { // Child thread not need to wait other child thread. requestObserver = xdsChannel.createDeltaDiscoveryRequest(new ResponseObserver(this, null)); - observedResources.values().forEach(requestObserver::onNext); + // FIXME, make sure recover all resource subscriptions. + // observedResources.values().forEach(requestObserver::onNext); return; } else { logger.error( diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java index 2c55cce8321..ad341f0d254 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/PilotExchanger.java @@ -18,122 +18,42 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.xds.directory.XdsDirectory; -import org.apache.dubbo.xds.protocol.XdsResourceListener; -import org.apache.dubbo.xds.protocol.impl.CdsProtocol; -import org.apache.dubbo.xds.protocol.impl.EdsProtocol; -import org.apache.dubbo.xds.protocol.impl.LdsProtocol; -import org.apache.dubbo.xds.protocol.impl.RdsProtocol; -import org.apache.dubbo.xds.resource.route.VirtualHost; -import org.apache.dubbo.xds.resource.update.EdsUpdate; -import org.apache.dubbo.xds.resource.update.RdsUpdate; +import org.apache.dubbo.xds.resource.XdsResourceType; +import org.apache.dubbo.xds.resource.update.ResourceUpdate; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; public class PilotExchanger { + private int pollingTimeout; + private ApplicationModel applicationModel; protected final AdsObserver adsObserver; - protected final LdsProtocol ldsProtocol; - - protected final RdsProtocol rdsProtocol; - - protected final EdsProtocol edsProtocol; - - protected final CdsProtocol cdsProtocol; - private final Set domainObserveRequest = new ConcurrentHashSet(); private static PilotExchanger GLOBAL_PILOT_EXCHANGER = null; - private static final Map xdsVirtualHostMap = new ConcurrentHashMap<>(); - - private static final Map xdsEndpointMap = new ConcurrentHashMap<>(); - - private final Map> rdsListeners = new ConcurrentHashMap<>(); - - private final Map> cdsListeners = new ConcurrentHashMap<>(); - protected PilotExchanger(URL url) { - int pollingTimeout = url.getParameter("pollingTimeout", 10); + this.pollingTimeout = url.getParameter("pollingTimeout", 10); adsObserver = new AdsObserver(url, NodeBuilder.build()); - - this.rdsProtocol = - new RdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout, url.getOrDefaultApplicationModel()); - this.edsProtocol = - new EdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout, url.getOrDefaultApplicationModel()); - this.ldsProtocol = - new LdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout, url.getOrDefaultApplicationModel()); - this.cdsProtocol = - new CdsProtocol(adsObserver, NodeBuilder.build(), pollingTimeout, url.getOrDefaultApplicationModel()); - - XdsResourceListener pilotRdsListener = xdsRouteConfigurations -> xdsRouteConfigurations.forEach( - xdsRouteConfiguration -> xdsRouteConfiguration.getVirtualHosts().forEach(virtualHost -> { - String serviceName = virtualHost.getDomains().get(0).split("\\.")[0]; - this.xdsVirtualHostMap.put(serviceName, virtualHost); - // when resource update, notify subscribers - if (rdsListeners.containsKey(serviceName)) { - for (XdsDirectory listener : rdsListeners.get(serviceName)) { - listener.onRdsChange(serviceName, virtualHost); - } - } - })); - - XdsResourceListener pilotEdsListener = edsUpdates -> { - edsUpdates.forEach(edsUpdate -> { - this.xdsEndpointMap.put(edsUpdate.getClusterName(), edsUpdate); - if (cdsListeners.containsKey(edsUpdate.getClusterName())) { - for (XdsDirectory listener : cdsListeners.get(edsUpdate.getClusterName())) { - listener.onEdsChange(edsUpdate.getClusterName(), edsUpdate); - } - } - }); - }; - - this.rdsProtocol.registerListen(pilotRdsListener); - this.edsProtocol.registerListen(pilotEdsListener); - // lds resources callback,listen to all rds resources in the callback function - this.ldsProtocol.registerListen(rdsProtocol.getLdsListener()); - this.cdsProtocol.registerListen(edsProtocol.getCdsListener()); - - // cds resources callback,listen to all cds resources in the callback function - this.cdsProtocol.subscribeClusters(); - this.ldsProtocol.subscribeListeners(); - } - - public static Map getXdsVirtualHostMap() { - return xdsVirtualHostMap; - } - - public static Map getXdsEndpointMap() { - return xdsEndpointMap; + this.applicationModel = url.getOrDefaultApplicationModel(); } - public void subscribeRds(String applicationName, XdsDirectory listener) { - rdsListeners.computeIfAbsent(applicationName, key -> new ConcurrentHashSet<>()); - rdsListeners.get(applicationName).add(listener); - if (xdsVirtualHostMap.containsKey(applicationName)) { - listener.onRdsChange(applicationName, this.xdsVirtualHostMap.get(applicationName)); + public void subscribeXdsResource( + String resourceName, XdsResourceType resourceType, XdsResourceListener resourceListener) { + if (!adsObserver.hasSubscribed(resourceType)) { + adsObserver.saveSubscribedType(resourceType); } - } - public void unSubscribeRds(String applicationName, XdsDirectory listener) { - rdsListeners.get(applicationName).remove(listener); - } - - public void subscribeCds(String clusterName, XdsDirectory listener) { - cdsListeners.computeIfAbsent(clusterName, key -> new ConcurrentHashSet<>()); - cdsListeners.get(clusterName).add(listener); - if (xdsEndpointMap.containsKey(clusterName)) { - listener.onEdsChange(clusterName, xdsEndpointMap.get(clusterName)); + XdsRawResourceProtocol xdsProtocol = adsObserver.addListener(resourceName, resourceType); + if (xdsProtocol != null) { + xdsProtocol.subscribeResource(resourceName, resourceType, resourceListener); } } - public void unSubscribeCds(String clusterName, XdsDirectory listener) { - cdsListeners.get(clusterName).remove(listener); - } + public void unSubscribeXdsResource(String clusterName, XdsDirectory listener) {} public static PilotExchanger initialize(URL url) { synchronized (PilotExchanger.class) { diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsListener.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceListener.java similarity index 82% rename from dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsListener.java rename to dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceListener.java index fcd8d65b846..95549f0efa2 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsListener.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceListener.java @@ -16,8 +16,8 @@ */ package org.apache.dubbo.xds; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; +import org.apache.dubbo.xds.resource.update.ResourceUpdate; -public interface XdsListener { - void process(DiscoveryResponse discoveryResponse); +public interface XdsRawResourceListener { + void onResourceUpdate(T resourceUpdate); } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceProtocol.java new file mode 100644 index 00000000000..7cd7bbb47f6 --- /dev/null +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsRawResourceProtocol.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.xds; + +import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.rpc.model.ApplicationModel; +import org.apache.dubbo.xds.resource.XdsResourceType; +import org.apache.dubbo.xds.resource.update.ResourceUpdate; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import io.envoyproxy.envoy.config.core.v3.Node; + +public class XdsRawResourceProtocol implements XdsRawResourceListener { + + private static final ErrorTypeAwareLogger logger = + LoggerFactory.getErrorTypeAwareLogger(XdsRawResourceProtocol.class); + + protected AdsObserver adsObserver; + + protected final Node node; + + protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); + + protected final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); + + protected Set observeResourcesName; + + public static final String emptyResourceName = "emptyResourcesName"; + private final ReentrantLock resourceLock = new ReentrantLock(); + + protected Map, List>>> consumerObserveMap = new ConcurrentHashMap<>(); + + public Map, List>>> getConsumerObserveMap() { + return consumerObserveMap; + } + + private XdsResourceType resourceTypeInstance; + + protected volatile T resourceUpdate; + // serviceKey to watcher + protected volatile Map> resourceListeners = new ConcurrentHashMap<>(); + + protected ApplicationModel applicationModel; + + public XdsRawResourceProtocol( + AdsObserver adsObserver, Node node, XdsResourceType resourceType, ApplicationModel applicationModel) { + this.adsObserver = adsObserver; + this.node = node; + this.applicationModel = applicationModel; + this.resourceTypeInstance = resourceType; + } + + public String getTypeUrl() { + return resourceTypeInstance.typeUrl(); + } + + private void discoveryResponseListener(Map oldResult, Map newResult) { + Set changedResourceNames = new HashSet<>(); + oldResult.forEach((key, origin) -> { + if (!Objects.equals(origin, newResult.get(key))) { + changedResourceNames.add(key); + } + }); + newResult.forEach((key, origin) -> { + if (!Objects.equals(origin, oldResult.get(key))) { + changedResourceNames.add(key); + } + }); + if (changedResourceNames.isEmpty()) { + return; + } + + logger.info("Receive resource update notification from xds server. Change resource count: " + + changedResourceNames.stream() + ". Type: " + getTypeUrl()); + + // call once for full data + try { + readLock.lock(); + for (Map.Entry, List>>> entry : consumerObserveMap.entrySet()) { + if (entry.getKey().stream().noneMatch(changedResourceNames::contains)) { + // none update + continue; + } + + Map dsResultMap = + entry.getKey().stream().collect(Collectors.toMap(k -> k, v -> newResult.get(v))); + entry.getValue().forEach(o -> o.accept(dsResultMap)); + } + } finally { + readLock.unlock(); + } + } + + @Override + public void onResourceUpdate(T resourceUpdate) { + if (resourceUpdate == null) { + return; + } + + T oldData = this.resourceUpdate; + this.resourceUpdate = resourceUpdate; + + if (!Objects.equals(oldData, resourceUpdate)) { + resourceListeners.forEach((resourceName, listener) -> { + listener.onResourceUpdate(resourceUpdate); + }); + } + } + + public void subscribeResource( + String resourceName, XdsResourceType resourceType, XdsResourceListener listener) { + if (resourceName == null) { + return; + } + + XdsResourceListener existingListener = resourceListeners.putIfAbsent(resourceName, listener); + if (existingListener == null) { + // update resource subscription + adsObserver.adjustResourceSubscription(resourceType); + } else { + listener.onResourceUpdate(resourceUpdate); + } + } + + // + // public void subscribeResource(Set resourceNames) { + // resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; + // + // if (!resourceNames.isEmpty() && isCacheExistResource(resourceNames)) { + // getResourceFromCache(resourceNames); + // } else { + // getResourceFromRemote(resourceNames); + // } + // } + // + // private Map getResourceFromCache(Set resourceNames) { + // return resourceNames.stream() + // .filter(o -> !StringUtils.isEmpty(o)) + // .collect(Collectors.toMap(k -> k, this::getCacheResource)); + // } + // + // public Map getResourceFromRemote(Set resourceNames) { + // try { + // resourceLock.lock(); + // CompletableFuture> future = new CompletableFuture<>(); + // observeResourcesName = resourceNames; + // Set consumerObserveResourceNames = new HashSet<>(); + // if (resourceNames.isEmpty()) { + // consumerObserveResourceNames.add(emptyResourceName); + // } else { + // consumerObserveResourceNames = resourceNames; + // } + // + // Consumer> futureConsumer = future::complete; + // try { + // writeLock.lock(); + // ConcurrentHashMapUtils.computeIfAbsent( + // (ConcurrentHashMap, List>>>) + // consumerObserveMap, + // consumerObserveResourceNames, + // key -> new ArrayList<>()) + // .add(futureConsumer); + // } finally { + // writeLock.unlock(); + // } + // + // Set resourceNamesToObserve = new HashSet<>(resourceNames); + // resourceNamesToObserve.addAll(resourcesMap.keySet()); + // adsObserver.request(buildDiscoveryRequest(resourceNamesToObserve)); + // logger.info("Send xDS Observe request to remote. Resource count: " + resourceNamesToObserve.size() + // + ". Resource Type: " + getTypeUrl()); + // } finally { + // resourceLock.unlock(); + // } + // return Collections.emptyMap(); + // } + + // public boolean isCacheExistResource(Set resourceNames) { + // for (String resourceName : resourceNames) { + // if ("".equals(resourceName)) { + // continue; + // } + // if (!resourcesMap.containsKey(resourceName)) { + // return false; + // } + // } + // return true; + // } + // + // public T getCacheResource(String resourceName) { + // if (resourceName == null || resourceName.length() == 0) { + // return null; + // } + // return resourcesMap.get(resourceName); + // } + +} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsResourceListener.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsResourceListener.java similarity index 88% rename from dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsResourceListener.java rename to dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsResourceListener.java index fa9a4998f22..5dd7aa01239 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsResourceListener.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/XdsResourceListener.java @@ -14,11 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dubbo.xds.protocol; - -import java.util.List; +package org.apache.dubbo.xds; public interface XdsResourceListener { - - void onResourceUpdate(List resource); + void onResourceUpdate(T resource); } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/RoutingUtils.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/RoutingUtils.java new file mode 100644 index 00000000000..9bdcc9c47cb --- /dev/null +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/RoutingUtils.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.xds.directory; + +import org.apache.dubbo.xds.resource.common.ThreadSafeRandom; +import org.apache.dubbo.xds.resource.matcher.FractionMatcher; +import org.apache.dubbo.xds.resource.matcher.HeaderMatcher; +import org.apache.dubbo.xds.resource.matcher.PathMatcher; +import org.apache.dubbo.xds.resource.route.RouteMatch; +import org.apache.dubbo.xds.resource.route.VirtualHost; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Locale; + +import com.google.common.base.Joiner; +import io.grpc.Metadata; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Utilities for performing virtual host domain name matching and route matching. + */ +public final class RoutingUtils { + // Prevent instantiation. + private RoutingUtils() {} + + /** + * Returns the {@link VirtualHost} with the best match domain for the given hostname. + */ + @Nullable + static VirtualHost findVirtualHostForHostName(List virtualHosts, String hostName) { + // Domain search order: + // 1. Exact domain names: ``www.foo.com``. + // 2. Suffix domain wildcards: ``*.foo.com`` or ``*-bar.foo.com``. + // 3. Prefix domain wildcards: ``foo.*`` or ``foo-*``. + // 4. Special wildcard ``*`` matching any domain. + // + // The longest wildcards match first. + // Assuming only a single virtual host in the entire route configuration can match + // on ``*`` and a domain must be unique across all virtual hosts. + int matchingLen = -1; // longest length of wildcard pattern that matches host name + boolean exactMatchFound = false; // true if a virtual host with exactly matched domain found + VirtualHost targetVirtualHost = null; // target VirtualHost with longest matched domain + for (VirtualHost vHost : virtualHosts) { + for (String domain : vHost.getDomains()) { + boolean selected = false; + if (matchHostName(hostName, domain)) { // matching + if (!domain.contains("*")) { // exact matching + exactMatchFound = true; + targetVirtualHost = vHost; + break; + } else if (domain.length() > matchingLen) { // longer matching pattern + selected = true; + } else if (domain.length() == matchingLen && domain.startsWith("*")) { // suffix matching + selected = true; + } + } + if (selected) { + matchingLen = domain.length(); + targetVirtualHost = vHost; + } + } + if (exactMatchFound) { + break; + } + } + return targetVirtualHost; + } + + /** + * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with + * case-insensitive. + * + *

Wildcard pattern rules: + *

    + *
  1. A single asterisk (*) matches any domain.
  2. + *
  3. Asterisk (*) is only permitted in the left-most or the right-most part of the pattern, + * but not both.
  4. + *
+ */ + private static boolean matchHostName(String hostName, String pattern) { + checkArgument( + hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."), "Invalid host name"); + checkArgument( + pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."), + "Invalid pattern/domain name"); + + hostName = hostName.toLowerCase(Locale.US); + pattern = pattern.toLowerCase(Locale.US); + // hostName and pattern are now in lower case -- domain names are case-insensitive. + + if (!pattern.contains("*")) { + // Not a wildcard pattern -- hostName and pattern must match exactly. + return hostName.equals(pattern); + } + // Wildcard pattern + + if (pattern.length() == 1) { + return true; + } + + int index = pattern.indexOf('*'); + + // At most one asterisk (*) is allowed. + if (pattern.indexOf('*', index + 1) != -1) { + return false; + } + + // Asterisk can only match prefix or suffix. + if (index != 0 && index != pattern.length() - 1) { + return false; + } + + // HostName must be at least as long as the pattern because asterisk has to + // match one or more characters. + if (hostName.length() < pattern.length()) { + return false; + } + + if (index == 0 && hostName.endsWith(pattern.substring(1))) { + // Prefix matching fails. + return true; + } + + // Pattern matches hostname if suffix matching succeeds. + return index == pattern.length() - 1 && hostName.startsWith(pattern.substring(0, pattern.length() - 1)); + } + + /** + * Returns {@code true} iff the given {@link RouteMatch} matches the RPC's full method name and + * headers. + */ + static boolean matchRoute(RouteMatch routeMatch, String fullMethodName, Metadata headers, ThreadSafeRandom random) { + if (!matchPath(routeMatch.getPathMatcher(), fullMethodName)) { + return false; + } + for (HeaderMatcher headerMatcher : routeMatch.getHeaderMatchers()) { + if (!headerMatcher.matches(getHeaderValue(headers, headerMatcher.name()))) { + return false; + } + } + FractionMatcher fraction = routeMatch.getFractionMatcher(); + return fraction == null || random.nextInt(fraction.getDenominator()) < fraction.getNumerator(); + } + + private static boolean matchPath(PathMatcher pathMatcher, String fullMethodName) { + if (pathMatcher.getPath() != null) { + return pathMatcher.isCaseSensitive() + ? pathMatcher.getPath().equals(fullMethodName) + : pathMatcher.getPath().equalsIgnoreCase(fullMethodName); + } else if (pathMatcher.getPrefix() != null) { + return pathMatcher.isCaseSensitive() + ? fullMethodName.startsWith(pathMatcher.getPrefix()) + : fullMethodName + .toLowerCase(Locale.US) + .startsWith(pathMatcher.getPrefix().toLowerCase(Locale.US)); + } + return pathMatcher.getRegEx().matches(fullMethodName); + } + + @Nullable + private static String getHeaderValue(Metadata headers, String headerName) { + if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + return null; + } + if (headerName.equals("content-type")) { + return "application/grpc"; + } + Metadata.Key key; + try { + key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER); + } catch (IllegalArgumentException e) { + return null; + } + Iterable values = headers.getAll(key); + return values == null ? null : Joiner.on(",").join(values); + } +} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java index e4c121e479e..441b8080db5 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/directory/XdsDirectory.java @@ -19,6 +19,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.url.component.URLAddress; import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; @@ -28,20 +29,43 @@ import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory; import org.apache.dubbo.rpc.cluster.router.state.BitList; import org.apache.dubbo.xds.PilotExchanger; +import org.apache.dubbo.xds.XdsResourceListener; +import org.apache.dubbo.xds.directory.XdsDirectory.LdsUpdateWatcher.RdsUpdateWatcher; +import org.apache.dubbo.xds.resource.XdsClusterResource; +import org.apache.dubbo.xds.resource.XdsListenerResource; +import org.apache.dubbo.xds.resource.XdsRouteConfigureResource; +import org.apache.dubbo.xds.resource.cluster.OutlierDetection; +import org.apache.dubbo.xds.resource.common.Locality; +import org.apache.dubbo.xds.resource.endpoint.DropOverload; import org.apache.dubbo.xds.resource.endpoint.LbEndpoint; +import org.apache.dubbo.xds.resource.endpoint.LocalityLbEndpoints; +import org.apache.dubbo.xds.resource.filter.NamedFilterConfig; +import org.apache.dubbo.xds.resource.listener.HttpConnectionManager; +import org.apache.dubbo.xds.resource.listener.security.UpstreamTlsContext; import org.apache.dubbo.xds.resource.route.ClusterWeight; import org.apache.dubbo.xds.resource.route.Route; import org.apache.dubbo.xds.resource.route.RouteAction; import org.apache.dubbo.xds.resource.route.VirtualHost; +import org.apache.dubbo.xds.resource.update.CdsUpdate; +import org.apache.dubbo.xds.resource.update.CdsUpdate.ClusterType; import org.apache.dubbo.xds.resource.update.EdsUpdate; +import org.apache.dubbo.xds.resource.update.LdsUpdate; +import org.apache.dubbo.xds.resource.update.RdsUpdate; +import javax.annotation.Nullable; + +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; + +import com.google.common.collect.Sets; public class XdsDirectory extends AbstractDirectory { @@ -63,6 +87,11 @@ public class XdsDirectory extends AbstractDirectory { private static ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(XdsDirectory.class); + private Map ldsWatchers = new HashMap<>(); + private Map rdsWatchers = new HashMap<>(); + private Map cdsWatchers = new HashMap<>(); + private Map edsWatchers = new HashMap<>(); + public XdsDirectory(Directory directory) { super(directory.getUrl(), null, true, directory.getConsumerUrl()); this.serviceType = directory.getInterface(); @@ -76,7 +105,9 @@ public XdsDirectory(Directory directory) { // subscribe resource for (String applicationName : applicationNames) { - pilotExchanger.subscribeRds(applicationName, this); + LdsUpdateWatcher ldsUpdateWatcher = new LdsUpdateWatcher(applicationName); + ldsWatchers.putIfAbsent(applicationName, ldsUpdateWatcher); + pilotExchanger.subscribeXdsResource(applicationName, XdsListenerResource.getInstance(), ldsUpdateWatcher); } } @@ -112,13 +143,6 @@ public List> getAllInvokers() { return super.getInvokers(); } - public void onRdsChange(String applicationName, VirtualHost xdsVirtualHost) { - Set oldCluster = getAllCluster(); - xdsVirtualHostMap.put(applicationName, xdsVirtualHost); - Set newCluster = getAllCluster(); - changeClusterSubscribe(oldCluster, newCluster); - } - private Set getAllCluster() { if (CollectionUtils.isEmptyMap(xdsVirtualHostMap)) { return new HashSet<>(); @@ -139,53 +163,283 @@ private Set getAllCluster() { return clusters; } - private void changeClusterSubscribe(Set oldCluster, Set newCluster) { - Set removeSubscribe = new HashSet<>(oldCluster); - Set addSubscribe = new HashSet<>(newCluster); + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void destroy() { + super.destroy(); + // + // pilotExchanger.unSubscribeXdsResource(resourceName, this); + } + + public class LdsUpdateWatcher implements XdsResourceListener { + private final String ldsResourceName; + + @Nullable + private Set existingClusters; // clusters to which new requests can be routed - removeSubscribe.removeAll(newCluster); - addSubscribe.removeAll(oldCluster); + @Nullable + private RdsUpdateWatcher rdsUpdateWatcher; - // remove subscribe cluster - for (String cluster : removeSubscribe) { - pilotExchanger.unSubscribeCds(cluster, this); - xdsEndpointMap.remove(cluster); - // TODO: delete invokers which belong unsubscribed cluster + public LdsUpdateWatcher(String ldsResourceName) { + this.ldsResourceName = ldsResourceName; } - // add subscribe cluster - for (String cluster : addSubscribe) { - pilotExchanger.subscribeCds(cluster, this); + + @Override + public void onResourceUpdate(LdsUpdate update) { + HttpConnectionManager httpConnectionManager = update.getHttpConnectionManager(); + List virtualHosts = httpConnectionManager.getVirtualHosts(); + String rdsName = httpConnectionManager.getRdsName(); + + if (virtualHosts != null) { + updateRoutes( + virtualHosts, + httpConnectionManager.getHttpMaxStreamDurationNano(), + httpConnectionManager.getHttpFilterConfigs()); + } else { + rdsUpdateWatcher = new RdsUpdateWatcher( + rdsName, + httpConnectionManager.getHttpMaxStreamDurationNano(), + httpConnectionManager.getHttpFilterConfigs()); + rdsWatchers.putIfAbsent(rdsName, rdsUpdateWatcher); + pilotExchanger.subscribeXdsResource(rdsName, XdsRouteConfigureResource.getInstance(), rdsUpdateWatcher); + } + } + + private void updateRoutes( + List virtualHosts, + long httpMaxStreamDurationNano, + @Nullable List filterConfigs) { + // String authority = overrideAuthority != null ? overrideAuthority : ldsResourceName; + String authority = ldsResourceName; + VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority); + if (virtualHost == null) { + return; + } + + List routes = virtualHost.getRoutes(); + + // Populate all clusters to which requests can be routed to through the virtual host. + Set clusters = new HashSet<>(); + // uniqueName -> clusterName + Map clusterNameMap = new HashMap<>(); + for (Route route : routes) { + RouteAction action = route.getRouteAction(); + String clusterName; + if (action != null) { + if (action.getCluster() != null) { + clusterName = action.getCluster(); + clusters.add(clusterName); + clusterNameMap.put(clusterName, action.getCluster()); + } else if (action.getWeightedClusters() != null) { + for (ClusterWeight weighedCluster : action.getWeightedClusters()) { + clusterName = weighedCluster.getName(); + clusters.add(clusterName); + clusterNameMap.put(clusterName, weighedCluster.getName()); + } + } + } + } + + boolean shouldUpdateResult = existingClusters == null; + Set addedClusters = + existingClusters == null ? clusters : Sets.difference(clusters, existingClusters); + Set deletedClusters = + existingClusters == null ? Collections.emptySet() : Sets.difference(existingClusters, clusters); + existingClusters = clusters; + for (String cluster : addedClusters) { + CdsUpdateNodeDirectory cdsUpdateWatcher = new CdsUpdateNodeDirectory(); + cdsWatchers.putIfAbsent(cluster, cdsUpdateWatcher); + pilotExchanger.subscribeXdsResource(cluster, XdsClusterResource.getInstance(), cdsUpdateWatcher); + } + } + + public class RdsUpdateWatcher implements XdsResourceListener { + private String rdsName; + + private final long httpMaxStreamDurationNano; + + @Nullable + private final List filterConfigs; + + public RdsUpdateWatcher( + String rdsName, long httpMaxStreamDurationNano, @Nullable List filterConfigs) { + this.rdsName = rdsName; + this.httpMaxStreamDurationNano = httpMaxStreamDurationNano; + this.filterConfigs = filterConfigs; + } + + @Override + public void onResourceUpdate(RdsUpdate update) { + if (RdsUpdateWatcher.this != rdsUpdateWatcher) { + return; + } + updateRoutes(update.getVirtualHosts(), httpMaxStreamDurationNano, filterConfigs); + } } } - public void onEdsChange(String clusterName, EdsUpdate edsUpdate) { - xdsEndpointMap.put(clusterName, edsUpdate); - // String lbPolicy = xdsCluster.getLbPolicy(); - List xdsEndpoints = edsUpdate.getLocalityLbEndpointsMap().values().stream() - .flatMap(e -> e.getEndpoints().stream()) - .collect(Collectors.toList()); - BitList> invokers = new BitList<>(Collections.emptyList()); - xdsEndpoints.forEach(e -> { - String ip = e.getAddresses().getFirst().getAddress(); - int port = e.getAddresses().getFirst().getPort(); - URL url = new URL(this.protocolName, ip, port, this.serviceType.getName(), this.url.getParameters()); - // set cluster name - url = url.addParameter("clusterID", clusterName); - // set load balance policy - // url = url.addParameter("loadbalance", lbPolicy); - // cluster to invoker - Invoker invoker = this.protocol.refer(this.serviceType, url); - invokers.add(invoker); - }); - // TODO: Consider cases where some clients are not available - // super.getInvokers().addAll(invokers); - // TODO: Need add new api which can add invokers, because a XdsDirectory need monitor multi clusters. - super.setInvokers(invokers); - // xdsCluster.setInvokers(invokers); + /** + * This is the internal node of the Directory tree, which is responsible for creating invokers from clusters. + * + * Each invoker instance created in this should be representing a cluster pointing to another Directory instead of a specific instance invoker. + */ + public class CdsUpdateNodeDirectory implements XdsResourceListener { + @Override + public void onResourceUpdate(CdsUpdate update) { + // 啥都不干,就是把 aggregate logicalDns eds 三种做个分类处理,其中eds的不用做什么事情 + if (update.getClusterType() == ClusterType.AGGREGATE) { + String clusterName = update.getClusterName(); + for (String cluster : update.getPrioritizedClusterNames()) { + // create internal node directory. + } + } else if (update.getClusterType() == ClusterType.EDS) { + // create leaf directory. + } else { + + } + } } - @Override - public boolean isAvailable() { - return true; + /** + * This is the leaf node of the Directory tree, which is responsible for creating invokers from endpoints. + * + * Each invoker instance created in this should be representing a specific dubbo provider instance. + */ + public class EdsUpdateLeafDirectory implements XdsResourceListener { + private final String clusterName; + private final String edsResourceName; + + @Nullable + protected final Long maxConcurrentRequests; + + @Nullable + protected final UpstreamTlsContext tlsContext; + + @Nullable + protected final OutlierDetection outlierDetection; + + private Map localityPriorityNames = Collections.emptyMap(); + + int priorityNameGenId = 1; + + public EdsUpdateLeafDirectory( + String clusterName, + String edsResourceName, + @Nullable Long maxConcurrentRequests, + @Nullable UpstreamTlsContext tlsContext, + @Nullable OutlierDetection outlierDetection) { + this.clusterName = clusterName; + this.edsResourceName = edsResourceName; + this.maxConcurrentRequests = maxConcurrentRequests; + this.tlsContext = tlsContext; + this.outlierDetection = outlierDetection; + } + + @Override + public void onResourceUpdate(EdsUpdate update) { + Map localityLbEndpoints = update.getLocalityLbEndpointsMap(); + List dropOverloads = update.getDropPolicies(); + List addresses = new ArrayList<>(); + Map> prioritizedLocalityWeights = new HashMap<>(); + List sortedPriorityNames = generatePriorityNames(clusterName, localityLbEndpoints); + for (Locality locality : localityLbEndpoints.keySet()) { + LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality); + String priorityName = localityPriorityNames.get(locality); + boolean discard = true; + for (LbEndpoint endpoint : localityLbInfo.getEndpoints()) { + if (endpoint.isHealthy()) { + discard = false; + long weight = localityLbInfo.getLocalityWeight(); + if (endpoint.getLoadBalancingWeight() != 0) { + weight *= endpoint.getLoadBalancingWeight(); + } + addresses.add(endpoint.getAddresses().get(0)); + } + } + if (discard) { + logger.info("Discard locality {0} with 0 healthy endpoints", locality); + continue; + } + if (!prioritizedLocalityWeights.containsKey(priorityName)) { + prioritizedLocalityWeights.put(priorityName, new HashMap()); + } + prioritizedLocalityWeights.get(priorityName).put(locality, localityLbInfo.getLocalityWeight()); + } + + sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet()); + } + + private List generatePriorityNames( + String name, Map localityLbEndpoints) { + TreeMap> todo = new TreeMap<>(); + for (Locality locality : localityLbEndpoints.keySet()) { + int priority = localityLbEndpoints.get(locality).getPriority(); + if (!todo.containsKey(priority)) { + todo.put(priority, new ArrayList<>()); + } + todo.get(priority).add(locality); + } + Map newNames = new HashMap<>(); + Set usedNames = new HashSet<>(); + List ret = new ArrayList<>(); + for (Integer priority : todo.keySet()) { + String foundName = ""; + for (Locality locality : todo.get(priority)) { + if (localityPriorityNames.containsKey(locality) + && usedNames.add(localityPriorityNames.get(locality))) { + foundName = localityPriorityNames.get(locality); + break; + } + } + if ("".equals(foundName)) { + foundName = String.format(Locale.US, "%s[child%d]", name, priorityNameGenId++); + } + for (Locality locality : todo.get(priority)) { + newNames.put(locality, foundName); + } + ret.add(foundName); + } + localityPriorityNames = newNames; + return ret; + } } + + // + // public void onResourceUpdate(CdsUpdate cdsUpdate) { + // // for eds cluster, do nothing + // + // // for aggregate clusters, do subscription + // String clusterName = cdsUpdate.getClusterName(); + // this.pilotExchanger.subscribeCds(clusterName, this); + // } + // + // public void onResourceUpdate(String clusterName, EdsUpdate edsUpdate) { + // xdsEndpointMap.put(clusterName, edsUpdate); + // // String lbPolicy = xdsCluster.getLbPolicy(); + // List xdsEndpoints = edsUpdate.getLocalityLbEndpointsMap().values().stream() + // .flatMap(e -> e.getEndpoints().stream()) + // .collect(Collectors.toList()); + // BitList> invokers = new BitList<>(Collections.emptyList()); + // xdsEndpoints.forEach(e -> { + // String ip = e.getAddresses().get(0).getAddress(); + // int port = e.getAddresses().get(0).getPort(); + // URL url = new URL(this.protocolName, ip, port, this.serviceType.getName(), this.url.getParameters()); + // // set cluster name + // url = url.addParameter("clusterID", clusterName); + // // set load balance policy + // // url = url.addParameter("loadbalance", lbPolicy); + // // cluster to invoker + // Invoker invoker = this.protocol.refer(this.serviceType, url); + // invokers.add(invoker); + // }); + // // TODO: Consider cases where some clients are not available + // // super.getInvokers().addAll(invokers); + // // TODO: Need add new api which can add invokers, because a XdsDirectory need monitor multi clusters. + // super.setInvokers(invokers); + // // xdsCluster.setInvokers(invokers); + // } } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/CdsListener.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/CdsListener.java index 2e875d6fc66..bc8498cbbb1 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/CdsListener.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/CdsListener.java @@ -18,8 +18,11 @@ import org.apache.dubbo.common.extension.ExtensionScope; import org.apache.dubbo.common.extension.SPI; -import org.apache.dubbo.xds.protocol.XdsResourceListener; import org.apache.dubbo.xds.resource.update.CdsUpdate; +import java.util.List; + @SPI(scope = ExtensionScope.APPLICATION) -public interface CdsListener extends XdsResourceListener {} +public interface CdsListener { + void onResourceUpdate(List resource); +} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/LdsListener.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/LdsListener.java index e73e652c645..f874e2e0866 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/LdsListener.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/LdsListener.java @@ -18,8 +18,11 @@ import org.apache.dubbo.common.extension.ExtensionScope; import org.apache.dubbo.common.extension.SPI; -import org.apache.dubbo.xds.protocol.XdsResourceListener; import org.apache.dubbo.xds.resource.update.LdsUpdate; +import java.util.List; + @SPI(scope = ExtensionScope.APPLICATION) -public interface LdsListener extends XdsResourceListener {} +public interface LdsListener { + void onResourceUpdate(List resource); +} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/UpstreamTlsConfigListener.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/UpstreamTlsConfigListener.java index 6380f365708..d5a6e0f5543 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/UpstreamTlsConfigListener.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/listener/UpstreamTlsConfigListener.java @@ -52,7 +52,6 @@ public UpstreamTlsConfigListener(ApplicationModel application) { this.tlsConfigRepository = application.getBeanFactory().getOrRegisterBean(XdsTlsConfigRepository.class); } - @Override public void onResourceUpdate(List resources) { Map configs = new ConcurrentHashMap<>(16); List clusters = diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/AbstractProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/AbstractProtocol.java deleted file mode 100644 index 7f04e1d8a5a..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/AbstractProtocol.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol; - -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ConcurrentHashMapUtils; -import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.AdsObserver; -import org.apache.dubbo.xds.XdsListener; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -public abstract class AbstractProtocol implements XdsProtocol, XdsListener { - - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractProtocol.class); - - protected AdsObserver adsObserver; - - protected final Node node; - - private final int checkInterval; - - protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - protected final ReentrantReadWriteLock.ReadLock readLock = lock.readLock(); - - protected final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock(); - - protected Set observeResourcesName; - - public static final String emptyResourceName = "emptyResourcesName"; - private final ReentrantLock resourceLock = new ReentrantLock(); - - protected Map, List>>> consumerObserveMap = new ConcurrentHashMap<>(); - - public Map, List>>> getConsumerObserveMap() { - return consumerObserveMap; - } - - protected Map resourcesMap = new ConcurrentHashMap<>(); - - protected List> resourceListeners = new CopyOnWriteArrayList<>(); - - protected ApplicationModel applicationModel; - - public AbstractProtocol(AdsObserver adsObserver, Node node, int checkInterval, ApplicationModel applicationModel) { - this.adsObserver = adsObserver; - this.node = node; - this.checkInterval = checkInterval; - this.applicationModel = applicationModel; - adsObserver.addListener(this); - } - - public void registerListen(XdsResourceListener listener) { - this.resourceListeners.add(listener); - } - - /** - * Abstract method to obtain Type-URL from sub-class - * - * @return Type-URL of xDS - */ - public abstract String getTypeUrl(); - - public boolean isCacheExistResource(Set resourceNames) { - for (String resourceName : resourceNames) { - if ("".equals(resourceName)) { - continue; - } - if (!resourcesMap.containsKey(resourceName)) { - return false; - } - } - return true; - } - - public T getCacheResource(String resourceName) { - if (resourceName == null || resourceName.length() == 0) { - return null; - } - return resourcesMap.get(resourceName); - } - - @Override - public void subscribeResource(Set resourceNames) { - resourceNames = resourceNames == null ? Collections.emptySet() : resourceNames; - - if (!resourceNames.isEmpty() && isCacheExistResource(resourceNames)) { - getResourceFromCache(resourceNames); - } else { - getResourceFromRemote(resourceNames); - } - } - - private Map getResourceFromCache(Set resourceNames) { - return resourceNames.stream() - .filter(o -> !StringUtils.isEmpty(o)) - .collect(Collectors.toMap(k -> k, this::getCacheResource)); - } - - public Map getResourceFromRemote(Set resourceNames) { - try { - resourceLock.lock(); - CompletableFuture> future = new CompletableFuture<>(); - observeResourcesName = resourceNames; - Set consumerObserveResourceNames = new HashSet<>(); - if (resourceNames.isEmpty()) { - consumerObserveResourceNames.add(emptyResourceName); - } else { - consumerObserveResourceNames = resourceNames; - } - - Consumer> futureConsumer = future::complete; - try { - writeLock.lock(); - ConcurrentHashMapUtils.computeIfAbsent( - (ConcurrentHashMap, List>>>) consumerObserveMap, - consumerObserveResourceNames, - key -> new ArrayList<>()) - .add(futureConsumer); - } finally { - writeLock.unlock(); - } - - Set resourceNamesToObserve = new HashSet<>(resourceNames); - resourceNamesToObserve.addAll(resourcesMap.keySet()); - adsObserver.request(buildDiscoveryRequest(resourceNamesToObserve)); - logger.info("Send xDS Observe request to remote. Resource count: " + resourceNamesToObserve.size() - + ". Resource Type: " + getTypeUrl()); - } finally { - resourceLock.unlock(); - } - return Collections.emptyMap(); - } - - protected DiscoveryRequest buildDiscoveryRequest(Set resourceNames) { - return DiscoveryRequest.newBuilder() - .setNode(node) - .setTypeUrl(getTypeUrl()) - .addAllResourceNames(resourceNames) - .build(); - } - - // protected abstract Map decodeDiscoveryResponse(DiscoveryResponse response); - - protected abstract Map decodeDiscoveryResponse(DiscoveryResponse response); - - @Override - public final void process(DiscoveryResponse discoveryResponse) { - // Map newResult = decodeDiscoveryResponse(discoveryResponse); - Map oldResource = resourcesMap; - // discoveryResponseListener(oldResource, newResult); - - Map newResult = decodeDiscoveryResponse(discoveryResponse); - resourceListeners.forEach(l -> l.onResourceUpdate(new ArrayList<>(newResult.values()))); - resourcesMap = newResult; - } - - private void discoveryResponseListener(Map oldResult, Map newResult) { - Set changedResourceNames = new HashSet<>(); - oldResult.forEach((key, origin) -> { - if (!Objects.equals(origin, newResult.get(key))) { - changedResourceNames.add(key); - } - }); - newResult.forEach((key, origin) -> { - if (!Objects.equals(origin, oldResult.get(key))) { - changedResourceNames.add(key); - } - }); - if (changedResourceNames.isEmpty()) { - return; - } - - logger.info("Receive resource update notification from xds server. Change resource count: " - + changedResourceNames.stream() + ". Type: " + getTypeUrl()); - - // call once for full data - try { - readLock.lock(); - for (Map.Entry, List>>> entry : consumerObserveMap.entrySet()) { - if (entry.getKey().stream().noneMatch(changedResourceNames::contains)) { - // none update - continue; - } - - Map dsResultMap = - entry.getKey().stream().collect(Collectors.toMap(k -> k, v -> newResult.get(v))); - entry.getValue().forEach(o -> o.accept(dsResultMap)); - } - } finally { - readLock.unlock(); - } - } -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsProtocol.java deleted file mode 100644 index 838988ad518..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/XdsProtocol.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol; - -import java.util.Set; - -public interface XdsProtocol { - /** - * Gets all resources by the specified resource name. - * For LDS, the {@param resourceNames} is ignored - * - * @param resourceNames specified resource name - * @return resources, null if request failed - */ - void subscribeResource(Set resourceNames); - - /** - * Add a observer resource with {@link Consumer} - * - * @param resourceNames specified resource name - * @param consumer resource notifier, will be called when resource updated - * @return requestId, used when resourceNames update with {@link XdsProtocol#updateObserve(long, Set)} - */ - // void observeResource(Set resourceNames, Consumer> consumer, boolean isReConnect); -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/CdsProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/CdsProtocol.java deleted file mode 100644 index 33525699a45..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/CdsProtocol.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol.impl; - -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.AdsObserver; -import org.apache.dubbo.xds.listener.CdsListener; -import org.apache.dubbo.xds.protocol.AbstractProtocol; -import org.apache.dubbo.xds.resource.XdsClusterResource; -import org.apache.dubbo.xds.resource.XdsResourceType; -import org.apache.dubbo.xds.resource.update.CdsUpdate; -import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS; - -public class CdsProtocol extends AbstractProtocol { - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(CdsProtocol.class); - - public void setUpdateCallback(Consumer> updateCallback) { - this.updateCallback = updateCallback; - } - - private static final XdsClusterResource xdsClusterResource = XdsClusterResource.getInstance(); - - private Consumer> updateCallback; - - public CdsProtocol(AdsObserver adsObserver, Node node, int checkInterval, ApplicationModel applicationModel) { - super(adsObserver, node, checkInterval, applicationModel); - List ldsListeners = - applicationModel.getExtensionLoader(CdsListener.class).getActivateExtensions(); - ldsListeners.forEach(this::registerListen); - } - - @Override - public String getTypeUrl() { - return "type.googleapis.com/envoy.config.cluster.v3.Cluster"; - } - - public void subscribeClusters() { - subscribeResource(null); - } - - @Override - protected Map decodeDiscoveryResponse(DiscoveryResponse response) { - if (!getTypeUrl().equals(response.getTypeUrl())) { - return Collections.emptyMap(); - } - ValidatedResourceUpdate validatedResourceUpdate = - xdsClusterResource.parse(XdsResourceType.xdsResourceTypeArgs, response.getResourcesList()); - if (!validatedResourceUpdate.getErrors().isEmpty()) { - logger.error( - REGISTRY_ERROR_PARSING_XDS, - validatedResourceUpdate.getErrors().toArray()); - } - return validatedResourceUpdate.getParsedResources().entrySet().stream() - .collect(Collectors.toConcurrentMap( - Entry::getKey, e -> e.getValue().getResourceUpdate())); - } -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/EdsProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/EdsProtocol.java deleted file mode 100644 index 11314fc0137..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/EdsProtocol.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol.impl; - -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.AdsObserver; -import org.apache.dubbo.xds.protocol.AbstractProtocol; -import org.apache.dubbo.xds.protocol.XdsResourceListener; -import org.apache.dubbo.xds.resource.XdsEndpointResource; -import org.apache.dubbo.xds.resource.XdsResourceType; -import org.apache.dubbo.xds.resource.update.CdsUpdate; -import org.apache.dubbo.xds.resource.update.EdsUpdate; -import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate; - -import java.util.Collections; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS; - -public class EdsProtocol extends AbstractProtocol { - - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(EdsProtocol.class); - - private static final XdsEndpointResource xdsEndpointResource = XdsEndpointResource.getInstance(); - - private XdsResourceListener clusterListener = clusters -> { - Set clusterNames = - clusters.stream().map(CdsUpdate::getClusterName).collect(Collectors.toSet()); - this.subscribeResource(clusterNames); - }; - - public EdsProtocol(AdsObserver adsObserver, Node node, int checkInterval, ApplicationModel applicationModel) { - super(adsObserver, node, checkInterval, applicationModel); - } - - @Override - public String getTypeUrl() { - return "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"; - } - - public XdsResourceListener getCdsListener() { - return clusterListener; - } - - @Override - protected Map decodeDiscoveryResponse(DiscoveryResponse response) { - if (!getTypeUrl().equals(response.getTypeUrl())) { - return Collections.emptyMap(); - } - ValidatedResourceUpdate validatedResourceUpdate = - xdsEndpointResource.parse(XdsResourceType.xdsResourceTypeArgs, response.getResourcesList()); - if (!validatedResourceUpdate.getErrors().isEmpty()) { - logger.error( - REGISTRY_ERROR_PARSING_XDS, - validatedResourceUpdate.getErrors().toArray()); - } - return validatedResourceUpdate.getParsedResources().entrySet().stream() - .collect(Collectors.toConcurrentMap( - Entry::getKey, e -> e.getValue().getResourceUpdate())); - } -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/LdsProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/LdsProtocol.java deleted file mode 100644 index bc8cd0c5480..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/LdsProtocol.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol.impl; - -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.AdsObserver; -import org.apache.dubbo.xds.listener.LdsListener; -import org.apache.dubbo.xds.protocol.AbstractProtocol; -import org.apache.dubbo.xds.resource.XdsListenerResource; -import org.apache.dubbo.xds.resource.XdsResourceType; -import org.apache.dubbo.xds.resource.update.LdsUpdate; -import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate; - -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.stream.Collectors; - -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS; - -public class LdsProtocol extends AbstractProtocol { - - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(LdsProtocol.class); - - private static final XdsListenerResource xdsListenerResource = XdsListenerResource.getInstance(); - - public LdsProtocol(AdsObserver adsObserver, Node node, int checkInterval, ApplicationModel applicationModel) { - super(adsObserver, node, checkInterval, applicationModel); - List ldsListeners = - applicationModel.getExtensionLoader(LdsListener.class).getActivateExtensions(); - ldsListeners.forEach(this::registerListen); - } - - @Override - public String getTypeUrl() { - return "type.googleapis.com/envoy.config.listener.v3.Listener"; - } - - public void subscribeListeners() { - subscribeResource(null); - } - - @Override - protected Map decodeDiscoveryResponse(DiscoveryResponse response) { - if (!getTypeUrl().equals(response.getTypeUrl())) { - return Collections.emptyMap(); - } - - if (!getTypeUrl().equals(response.getTypeUrl())) { - return Collections.emptyMap(); - } - ValidatedResourceUpdate validatedResourceUpdate = - xdsListenerResource.parse(XdsResourceType.xdsResourceTypeArgs, response.getResourcesList()); - if (!validatedResourceUpdate.getErrors().isEmpty()) { - logger.error( - REGISTRY_ERROR_PARSING_XDS, - validatedResourceUpdate.getErrors().toArray()); - } - return validatedResourceUpdate.getParsedResources().entrySet().stream() - .collect(Collectors.toConcurrentMap( - Entry::getKey, e -> e.getValue().getResourceUpdate())); - } -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/RdsProtocol.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/RdsProtocol.java deleted file mode 100644 index 5b84f2cb17d..00000000000 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/protocol/impl/RdsProtocol.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.xds.protocol.impl; - -import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.rpc.model.ApplicationModel; -import org.apache.dubbo.xds.AdsObserver; -import org.apache.dubbo.xds.protocol.AbstractProtocol; -import org.apache.dubbo.xds.protocol.XdsResourceListener; -import org.apache.dubbo.xds.resource.XdsResourceType; -import org.apache.dubbo.xds.resource.XdsRouteConfigureResource; -import org.apache.dubbo.xds.resource.update.LdsUpdate; -import org.apache.dubbo.xds.resource.update.RdsUpdate; -import org.apache.dubbo.xds.resource.update.ValidatedResourceUpdate; - -import java.util.Collections; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.stream.Collectors; - -import io.envoyproxy.envoy.config.core.v3.Node; -import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; - -import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ERROR_PARSING_XDS; - -public class RdsProtocol extends AbstractProtocol { - - private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(RdsProtocol.class); - - private static final XdsRouteConfigureResource xdsRouteConfigureResource = XdsRouteConfigureResource.getInstance(); - - public RdsProtocol(AdsObserver adsObserver, Node node, int checkInterval, ApplicationModel applicationModel) { - super(adsObserver, node, checkInterval, applicationModel); - } - - @Override - public String getTypeUrl() { - return "type.googleapis.com/envoy.config.route.v3.RouteConfiguration"; - } - - @Override - protected Map decodeDiscoveryResponse(DiscoveryResponse response) { - if (!getTypeUrl().equals(response.getTypeUrl())) { - return Collections.emptyMap(); - } - ValidatedResourceUpdate updates = - xdsRouteConfigureResource.parse(XdsResourceType.xdsResourceTypeArgs, response.getResourcesList()); - if (!updates.getInvalidResources().isEmpty()) { - logger.error(REGISTRY_ERROR_PARSING_XDS, updates.getErrors().toArray()); - } - return updates.getParsedResources().entrySet().stream() - .collect(Collectors.toConcurrentMap( - Entry::getKey, v -> v.getValue().getResourceUpdate())); - } - - public XdsResourceListener getLdsListener() { - return ldsListener; - } - - private final XdsResourceListener ldsListener = resource -> { - Set set = resource.stream() - .flatMap(l -> l.getListener().getFilterChains().stream()) - .map(c -> c.getHttpConnectionManager().getRdsName()) - .collect(Collectors.toSet()); - this.subscribeResource(set); - }; -} diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java index 507244f339d..74155f67d5f 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsClusterResource.java @@ -69,7 +69,7 @@ String typeName() { } @Override - String typeUrl() { + public String typeUrl() { return ADS_TYPE_URL_CDS; } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsEndpointResource.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsEndpointResource.java index 89915023315..b628355371a 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsEndpointResource.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsEndpointResource.java @@ -62,7 +62,7 @@ String typeName() { } @Override - String typeUrl() { + public String typeUrl() { return ADS_TYPE_URL_EDS; } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java index 74c094a7db0..6a233c436aa 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsListenerResource.java @@ -90,7 +90,7 @@ Class unpackedClassName() { } @Override - String typeUrl() { + public String typeUrl() { return ADS_TYPE_URL_LDS; } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java index 3690b0f73ef..2f014f636b5 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/XdsResourceType.java @@ -72,7 +72,7 @@ public abstract class XdsResourceType { abstract String typeName(); - abstract String typeUrl(); + public abstract String typeUrl(); // Do not confuse with the SotW approach: it is the mechanism in which the client must specify all // resource names it is interested in with each request. Different resource types may behave diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/matcher/PathMatcher.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/matcher/PathMatcher.java index 30fa7228349..56d9cf64a1f 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/matcher/PathMatcher.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/matcher/PathMatcher.java @@ -76,7 +76,7 @@ public Pattern getRegEx() { return regEx; } - boolean isCaseSensitive() { + public boolean isCaseSensitive() { return caseSensitive; } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/route/RouteMatch.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/route/RouteMatch.java index dcd11d36729..1b4512b0ba5 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/route/RouteMatch.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/route/RouteMatch.java @@ -47,16 +47,16 @@ public RouteMatch( this.fractionMatcher = fractionMatcher; } - PathMatcher pathMatcher() { + public PathMatcher getPathMatcher() { return pathMatcher; } - List headerMatchers() { + public List getHeaderMatchers() { return headerMatchers; } @Nullable - FractionMatcher fractionMatcher() { + public FractionMatcher getFractionMatcher() { return fractionMatcher; } @@ -71,11 +71,11 @@ public boolean equals(Object o) { } if (o instanceof RouteMatch) { RouteMatch that = (RouteMatch) o; - return this.pathMatcher.equals(that.pathMatcher()) - && this.headerMatchers.equals(that.headerMatchers()) + return this.pathMatcher.equals(that.getPathMatcher()) + && this.headerMatchers.equals(that.getHeaderMatchers()) && (this.fractionMatcher == null - ? that.fractionMatcher() == null - : this.fractionMatcher.equals(that.fractionMatcher())); + ? that.getFractionMatcher() == null + : this.fractionMatcher.equals(that.getFractionMatcher())); } return false; } diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java index 92f2b4bceef..4f24b772d9c 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/resource/update/CdsUpdate.java @@ -31,13 +31,13 @@ public class CdsUpdate implements ResourceUpdate { - enum ClusterType { + public enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE } - enum LbPolicy { + public enum LbPolicy { ROUND_ROBIN, RING_HASH, LEAST_REQUEST diff --git a/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java b/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java index 04917e6e0b5..64d6055bad4 100644 --- a/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java +++ b/dubbo-xds/src/main/java/org/apache/dubbo/xds/router/XdsRouter.java @@ -81,7 +81,9 @@ protected BitList> doRoute( private String matchCluster(Invocation invocation) { String cluster = null; String serviceName = invocation.getInvoker().getUrl().getParameter("provided-by"); - VirtualHost xdsVirtualHost = pilotExchanger.getXdsVirtualHostMap().get(serviceName); + // VirtualHost xdsVirtualHost = pilotExchanger.getXdsVirtualHostMap().get(serviceName); + // FIXME + VirtualHost xdsVirtualHost = xdsVirtualHostMap.get(serviceName); // match route for (Route xdsRoute : xdsVirtualHost.getRoutes()) {