diff --git a/build.gradle b/build.gradle index e188fde2..af798628 100644 --- a/build.gradle +++ b/build.gradle @@ -5,23 +5,16 @@ buildscript { } plugins { - id 'com.gradle.build-scan' version '2.0.2' // declare before any other plugin id 'com.google.osdetector' version '1.4.0' id "com.github.hierynomus.license" version '0.15.0' id 'com.github.sherter.google-java-format' version '0.8' apply false - id 'me.champeau.gradle.jmh' version '0.4.7' apply false + id 'me.champeau.gradle.jmh' version '0.5.0' apply false id 'io.morethan.jmhreport' version '0.6.2.1' apply false id 'io.spring.dependency-management' version '1.0.8.RELEASE' apply false id 'com.google.protobuf' version '0.8.8' apply false id 'com.palantir.git-version' version '0.12.0-rc2' } -//buildScan { licenseAgreementUrl = 'https://gradle.com/terms-of-service'; licenseAgree = 'yes' } -buildScan { - termsOfServiceUrl = 'https://gradle.com/terms-of-service' - termsOfServiceAgree = 'yes' -} - // TODO: make this a Gradle plugin someday def details = versionDetails() def versionSuffix = "" @@ -65,9 +58,9 @@ subprojects { targetCompatibility = 1.8 repositories { - - mavenCentral() maven { url 'https://oss.jfrog.org/oss-snapshot-local' } + mavenCentral() + jcenter() maven { url 'https://nexus.netifi.com/repository/jcenter/' credentials { diff --git a/dependency-management.gradle b/dependency-management.gradle index f69e869c..49c545b8 100644 --- a/dependency-management.gradle +++ b/dependency-management.gradle @@ -1,5 +1,7 @@ apply plugin: 'io.spring.dependency-management' +ext["rsocket.version"] = rsocketVersion + dependencyManagement { imports { mavenBom "io.rsocket:rsocket-bom:${rsocketVersion}" @@ -11,6 +13,7 @@ dependencyManagement { mavenBom "software.amazon.awssdk:bom:${awssdkVersion}" mavenBom "io.rsocket:rsocket-bom:${rsocketVersion}" mavenBom "com.google.protobuf:protobuf-bom:${protobufVersion}" + mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootDependenciesVersion}" } dependencies { diff --git a/gradle.properties b/gradle.properties index c10d4c1c..aa866a3a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -28,7 +28,7 @@ postgresqlVersion=42.2.5 protobufVersion=3.7.1 reactorBomVersion=Dysprosium-RELEASE roaringbitmapVersion=0.7.42 -rsocketRpcVersion=0.3.0-feature-prioritization-SNAPSHOT +rsocketRpcVersion=0.3.0-feature-routing-SNAPSHOT rsocketVersion=1.0.0-RC6-bugfix-prioritization-SNAPSHOT rxjava2JdbcVersion=0.2.4 slf4jVersion=1.7.25 diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 87b738cb..5c2d1cf0 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ef9a9e05..94920145 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index af6708ff..83f2acfd 100755 --- a/gradlew +++ b/gradlew @@ -1,5 +1,21 @@ #!/usr/bin/env sh +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + ############################################################################## ## ## Gradle start up script for UN*X @@ -28,7 +44,7 @@ APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m"' +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -109,8 +125,8 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` JAVACMD=`cygpath --unix "$JAVACMD"` diff --git a/gradlew.bat b/gradlew.bat index 0f8d5937..24467a14 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,3 +1,19 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + @if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem @@ -14,7 +30,7 @@ set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/BrokerClient.java b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerClient.java index d9c61920..f649ec2b 100644 --- a/netifi-broker-client/src/main/java/com/netifi/broker/BrokerClient.java +++ b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerClient.java @@ -21,6 +21,7 @@ import com.netifi.broker.rsocket.BrokerSocket; import com.netifi.broker.rsocket.NamedRSocketClientWrapper; import com.netifi.broker.rsocket.transport.BrokerAddressSelectors; +import com.netifi.common.net.HostAndPort; import com.netifi.common.tags.Tag; import com.netifi.common.tags.Tags; import io.netty.buffer.ByteBuf; @@ -33,8 +34,10 @@ import io.opentracing.Tracer; import io.rsocket.Closeable; import io.rsocket.RSocket; -import io.rsocket.rpc.RSocketRpcService; -import io.rsocket.rpc.rsocket.RequestHandlingRSocket; +import io.rsocket.ipc.MutableRouter; +import io.rsocket.ipc.RoutingServerRSocket; +import io.rsocket.ipc.SelfRegistrable; +import io.rsocket.ipc.routing.SimpleRouter; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; @@ -43,11 +46,13 @@ import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -56,6 +61,7 @@ import reactor.netty.tcp.TcpClient; /** This is where the magic happens */ +@Deprecated public class BrokerClient implements Closeable { private static final Logger logger = LoggerFactory.getLogger(BrokerClient.class); private static final ConcurrentHashMap BROKERCLIENT = @@ -71,9 +77,27 @@ public class BrokerClient implements Closeable { private final String group; private final String destination; private final Tags tags; - private final BrokerService brokerService; - private MonoProcessor onClose; - private RequestHandlingRSocket requestHandlingRSocket; + private final RoutingBrokerService brokerService; + private final Mono onClose; + private final MutableRouter router; + private RoutingServerRSocket routingServerRSocket; + + private BrokerClient(RoutingBrokerService routingBrokerService) { + brokerService = routingBrokerService; + tags = routingBrokerService.tags(); + destination = + routingBrokerService + .tags() + .stream() + .filter(tag -> tag.getKey().equals("com" + ".netifi.destination")) + .findFirst() + .map(Tag::getValue) + .orElse(null); + group = routingBrokerService.groupName(); + accesskey = routingBrokerService.accessKey(); + onClose = routingBrokerService.onClose(); + router = routingBrokerService.router(); + } private BrokerClient( long accessKey, @@ -88,10 +112,11 @@ private BrokerClient( long tickPeriodSeconds, long ackTimeoutSeconds, int missedAcks, - List seedAddresses, + List seedAddresses, Function addressSelector, Function clientTransportFactory, - RequestHandlingRSocket responder, + MutableRouter router, + RoutingServerRSocket responder, boolean responderRequiresUnwrapping, int poolSize, Supplier tracerSupplier, @@ -101,28 +126,41 @@ private BrokerClient( this.destination = destination; this.tags = tags; this.onClose = MonoProcessor.create(); - this.requestHandlingRSocket = responder; + this.router = router; + this.routingServerRSocket = responder; + + if (discoveryStrategy == null) { + discoveryStrategy = + () -> + Mono.just( + seedAddresses + .stream() + .map(isa -> HostAndPort.fromParts(isa.getHostName(), isa.getPort())) + .collect(Collectors.toList())); + } + this.brokerService = - new DefaultBrokerService( - seedAddresses, - requestHandlingRSocket, - responderRequiresUnwrapping, - inetAddress, - group, - addressSelector, - clientTransportFactory, - poolSize, - keepalive, - tickPeriodSeconds, - ackTimeoutSeconds, - missedAcks, - accessKey, - accessToken, - connectionIdSeed, - additionalFlags, - tags, - tracerSupplier.get(), - discoveryStrategy); + new DefaultRoutingBrokerService( + router, + new DefaultBrokerService( + routingServerRSocket, + responderRequiresUnwrapping, + inetAddress, + group, + tags, + connectionIdSeed, + addressSelector, + clientTransportFactory, + discoveryStrategy, + keepalive, + Duration.ofSeconds(tickPeriodSeconds), + Duration.ofSeconds(ackTimeoutSeconds), + missedAcks, + accessKey, + accessToken, + additionalFlags, + poolSize, + tracerSupplier.get())); } public String getGroup() { @@ -150,19 +188,23 @@ public static CustomizableBuilder customizable() { return new CustomizableBuilder(); } + public static > BrokerClient from( + RoutingBrokerService routingBrokerService) { + return new BrokerClient(routingBrokerService); + } + private static String defaultDestination() { return UUID.randomUUID().toString(); } @Override public void dispose() { - requestHandlingRSocket.dispose(); - onClose.onComplete(); + routingServerRSocket.dispose(); } @Override public boolean isDisposed() { - return onClose.isTerminated(); + return routingServerRSocket.isDisposed(); } @Override @@ -176,9 +218,9 @@ public Mono onClose() { * @param service the RSocketRpcService instance * @return current BrokerClient builder instance */ - public BrokerClient addService(RSocketRpcService service) { + public BrokerClient addService(SelfRegistrable service) { Objects.requireNonNull(service); - requestHandlingRSocket.addService(service); + service.selfRegister(router); return this; } @@ -318,11 +360,12 @@ public abstract static class CommonBuilder> { InetAddress inetAddress = DefaultBuilderConfig.getLocalAddress(); String host = DefaultBuilderConfig.getHost(); Integer port = DefaultBuilderConfig.getPort(); - List seedAddresses = DefaultBuilderConfig.getSeedAddress(); + List seedAddresses = DefaultBuilderConfig.getSeedAddress(); String netifiKey; - List socketAddresses; + List socketAddresses; DiscoveryStrategy discoveryStrategy = null; - RequestHandlingRSocket responder = new RequestHandlingRSocket(); // DEFAULT + MutableRouter router = new SimpleRouter(); // DEFAULT + Function responderBuilder = RoutingServerRSocket::new; boolean responderRequiresUnwrapping = true; // DEFAULT public SELF discoveryStrategy(DiscoveryStrategy discoveryStrategy) { @@ -433,11 +476,8 @@ public SELF port(int port) { } public SELF seedAddresses(Collection addresses) { - if (addresses instanceof List) { - this.seedAddresses = (List) addresses; - } else { - this.seedAddresses = new ArrayList<>(addresses); - } + this.seedAddresses = + addresses.stream().map(sa -> (InetSocketAddress) sa).collect(Collectors.toList()); return (SELF) this; } @@ -486,8 +526,14 @@ public SELF localAddress(InetAddress address) { return (SELF) this; } - public SELF requestHandler(RequestHandlingRSocket responder, boolean requiresUnwrapping) { - this.responder = responder; + public SELF router(MutableRouter router) { + this.router = router; + return (SELF) this; + } + + public SELF requestHandler( + Function responder, boolean requiresUnwrapping) { + this.responderBuilder = responder; this.responderRequiresUnwrapping = requiresUnwrapping; return (SELF) this; } @@ -635,7 +681,8 @@ public BrokerClient build() { socketAddresses, BrokerAddressSelectors.WEBSOCKET_ADDRESS, clientTransportFactory, - responder, + router, + responderBuilder.apply(router), responderRequiresUnwrapping, poolSize, tracerSupplier, @@ -728,7 +775,8 @@ public BrokerClient build() { socketAddresses, BrokerAddressSelectors.TCP_ADDRESS, clientTransportFactory, - responder, + router, + responderBuilder.apply(router), responderRequiresUnwrapping, poolSize, tracerSupplier, @@ -779,7 +827,8 @@ public BrokerClient build() { socketAddresses, addressSelector, clientTransportFactory, - responder, + router, + responderBuilder.apply(router), responderRequiresUnwrapping, poolSize, tracerSupplier, @@ -796,7 +845,7 @@ public static class Builder { private InetAddress inetAddress = DefaultBuilderConfig.getLocalAddress(); private String host = DefaultBuilderConfig.getHost(); private Integer port = DefaultBuilderConfig.getPort(); - private List seedAddresses = DefaultBuilderConfig.getSeedAddress(); + private List seedAddresses = DefaultBuilderConfig.getSeedAddress(); private Long accessKey = DefaultBuilderConfig.getAccessKey(); private String group = DefaultBuilderConfig.getGroup(); private String destination = DefaultBuilderConfig.getDestination(); @@ -904,11 +953,8 @@ public Builder discoveryStrategy(DiscoveryStrategy discoveryStrategy) { } public Builder seedAddresses(Collection addresses) { - if (addresses instanceof List) { - this.seedAddresses = (List) addresses; - } else { - this.seedAddresses = new ArrayList<>(addresses); - } + this.seedAddresses = + addresses.stream().map(sa -> (InetSocketAddress) sa).collect(Collectors.toList()); return this; } @@ -1090,7 +1136,7 @@ public BrokerClient build() { } } - List socketAddresses = null; + List socketAddresses = null; if (discoveryStrategy == null) { if (seedAddresses == null) { Objects.requireNonNull(host, "host is required"); @@ -1106,10 +1152,11 @@ public BrokerClient build() { String netifiKey = accessKey + group; - List _s = socketAddresses; + List _s = socketAddresses; return BROKERCLIENT.computeIfAbsent( netifiKey, _k -> { + SimpleRouter router = new SimpleRouter(); BrokerClient brokerClient = new BrokerClient( accessKey, @@ -1127,7 +1174,8 @@ public BrokerClient build() { _s, addressSelector, clientTransportFactory, - new RequestHandlingRSocket(), + router, + new RoutingServerRSocket(router), true, poolSize, tracerSupplier, diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/BrokerFactory.java b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerFactory.java new file mode 100644 index 00000000..a1f1429e --- /dev/null +++ b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerFactory.java @@ -0,0 +1,935 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netifi.broker; + +import com.netifi.broker.discovery.DiscoveryStrategy; +import com.netifi.broker.discovery.StaticListDiscoveryConfig; +import com.netifi.broker.discovery.StaticListDiscoveryStrategy; +import com.netifi.broker.frames.DestinationSetupFlyweight; +import com.netifi.broker.info.Broker; +import com.netifi.broker.rsocket.transport.BrokerAddressSelectors; +import com.netifi.common.net.HostAndPort; +import com.netifi.common.tags.Tag; +import com.netifi.common.tags.Tags; +import io.netty.buffer.Unpooled; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.util.CharsetUtil; +import io.opentracing.Tracer; +import io.rsocket.AbstractRSocket; +import io.rsocket.RSocket; +import io.rsocket.ipc.MutableRouter; +import io.rsocket.ipc.RoutingServerRSocket; +import io.rsocket.ipc.routing.SimpleRouter; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.Arrays; +import java.util.Base64; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Exceptions; +import reactor.core.publisher.Mono; +import reactor.netty.tcp.TcpClient; + +public final class BrokerFactory { + + private static final Logger logger = LoggerFactory.getLogger(BrokerFactory.class); + private static final ConcurrentHashMap BROKERCLIENT = + new ConcurrentHashMap<>(); + private static final String DEFAULT_DESTINATION = defaultDestination(); + + private BrokerFactory() {} + + public static ClientBuilder connect() { + return new ClientBuilder(); + } + + private static String defaultDestination() { + return UUID.randomUUID().toString(); + } + + public interface KeepAliveConfig { + Duration tickPeriod(); + + Duration acknowledgeTimeout(); + + int missedAcknowledges(); + + static Builder builder() { + return new Builder(); + } + + static KeepAliveConfig noKeepAlive() { + return NoKeepAliveConfig.INSTANCE; + } + + abstract class BaseBuilder { + abstract KeepAliveConfig build(); + } + + class Builder extends BaseBuilder { + Duration tickPeriod = Duration.ofSeconds(DefaultBuilderConfig.getTickPeriodSeconds()); + Duration acknowledgeTimeout = Duration.ofSeconds(DefaultBuilderConfig.getAckTimeoutSeconds()); + int missedAcknowledges = DefaultBuilderConfig.getMissedAcks(); + + public Builder tickPeriod(Duration tickPeriod) { + this.tickPeriod = tickPeriod; + return this; + } + + public Builder acknowledgeTimeout(Duration acknowledgeTimeout) { + this.acknowledgeTimeout = acknowledgeTimeout; + return this; + } + + public Builder missedAcknowledges(int missedAcknowledges) { + this.missedAcknowledges = missedAcknowledges; + return this; + } + + KeepAliveConfig build() { + return new KeepAliveConfig() { + @Override + public Duration tickPeriod() { + return tickPeriod; + } + + @Override + public Duration acknowledgeTimeout() { + return acknowledgeTimeout; + } + + @Override + public int missedAcknowledges() { + return missedAcknowledges; + } + }; + } + } + + class NoKeepAliveBuilder extends BaseBuilder { + + @Override + KeepAliveConfig build() { + return NoKeepAliveConfig.INSTANCE; + } + } + + final class NoKeepAliveConfig implements KeepAliveConfig { + static final NoKeepAliveConfig INSTANCE = new NoKeepAliveConfig(); + + private NoKeepAliveConfig() {} + + @Override + public Duration tickPeriod() { + return Duration.ofMillis(-1); + } + + @Override + public Duration acknowledgeTimeout() { + return Duration.ofMillis(-1); + } + + @Override + public int missedAcknowledges() { + return -1; + } + } + + interface Spec { + void noKeepAlive(); + + Builder configure(); + } + + class DefaultSpec implements Spec { + + BaseBuilder builder; + + DefaultSpec() { + if (DefaultBuilderConfig.getKeepAlive()) { + configure(); + } else { + noKeepAlive(); + } + } + + @Override + public void noKeepAlive() { + builder = new NoKeepAliveBuilder(); + } + + @Override + public Builder configure() { + Builder builder = new Builder(); + this.builder = builder; + return builder; + } + } + } + + public interface SslConfig { + + Supplier sslContextProvider(); + + static SslConfig custom(Supplier sslContextSupplier) { + return () -> sslContextSupplier; + } + + static NoSslConfig noSslConfig() { + return NoSslConfig.INSTANCE; + } + + static SslConfig defaultSslConfig() { + return DefaultSslConfig.INSTANCE; + } + + final class NoSslConfig implements SslConfig { + static final NoSslConfig INSTANCE = new NoSslConfig(); + + private NoSslConfig() {} + + @Override + public Supplier sslContextProvider() { + return null; + } + } + + final class DefaultSslConfig implements SslConfig { + + static final DefaultSslConfig INSTANCE = new DefaultSslConfig(); + + private DefaultSslConfig() {} + + @Override + public Supplier sslContextProvider() { + return SUPPLIER_INSTANCE; + } + + static final Supplier SUPPLIER_INSTANCE = + () -> { + final SslProvider sslProvider; + if (OpenSsl.isAvailable()) { + logger.info("Native SSL provider is available; will use native provider."); + sslProvider = SslProvider.OPENSSL_REFCNT; + } else { + logger.info("Native SSL provider not available; will use JDK SSL provider."); + sslProvider = SslProvider.JDK; + } + try { + return SslContextBuilder.forClient() + .trustManager(InsecureTrustManagerFactory.INSTANCE) + .sslProvider(sslProvider) + .build(); + } catch (Throwable e) { + throw Exceptions.propagate(e); + } + }; + } + + interface Spec { + void unsecured(); + + void secured(); + + void secured(Supplier sslContextSupplier); + } + + class DefaultSpec implements Spec { + + SslConfig sslConfig; + + DefaultSpec() { + if (DefaultBuilderConfig.isSslDisabled()) { + unsecured(); + } else { + secured(); + } + } + + @Override + public void unsecured() { + sslConfig = new NoSslConfig(); + } + + @Override + public void secured() { + sslConfig = new DefaultSslConfig(); + } + + @Override + public void secured(Supplier sslContextSupplier) { + sslConfig = () -> sslContextSupplier; + } + } + } + + public interface DiscoveryConfig { + + DiscoveryStrategy discoveryStrategy(); + + static DiscoveryConfig staticDiscovery(int port, String... hosts) { + return () -> new StaticListDiscoveryStrategy(new StaticListDiscoveryConfig(port, hosts)); + } + + static DiscoveryConfig staticDiscovery(HostAndPort... addresses) { + return () -> (DiscoveryStrategy) () -> Mono.just(Arrays.asList(addresses)); + } + + static DiscoveryConfig custom(DiscoveryStrategy discoveryStrategy) { + return () -> discoveryStrategy; + } + + interface Spec { + void simple(int port, String... hosts); + + void simple(HostAndPort... addresses); + + void custom(DiscoveryStrategy discoveryStrategy); + } + + class DefaultSpec implements Spec { + + DiscoveryConfig config; + + @Override + public void simple(int port, String... hosts) { + config = staticDiscovery(port, hosts); + } + + @Override + public void simple(HostAndPort... addresses) { + config = staticDiscovery(addresses); + } + + @Override + public void custom(DiscoveryStrategy discoveryStrategy) { + config = DiscoveryConfig.custom(discoveryStrategy); + } + } + } + + public interface AuthenticationConfig { + long accessKey(); + + byte[] accessToken(); + + static AuthenticationConfig simple(long accessKey, String bast64EncodedAccessToken) { + return simple(accessKey, Base64.getDecoder().decode(bast64EncodedAccessToken)); + } + + static AuthenticationConfig simple(long accessKey, byte[] accessToken) { + return new AuthenticationConfig() { + @Override + public long accessKey() { + return accessKey; + } + + @Override + public byte[] accessToken() { + return accessToken; + } + }; + } + + static AuthenticationConfig jwt(String jwtTokenString) { + return new JwtAuthenticationConfig(jwtTokenString.getBytes(CharsetUtil.UTF_8)); + } + + static AuthenticationConfig jwt(byte[] jwtToken) { + return new JwtAuthenticationConfig(jwtToken); + } + + interface Spec { + JwtSpec jwt(); + + SimpleSpec simple(); + } + + interface SimpleSpec { + SimpleSpec key(long accessKey); + + SimpleSpec token(String bast64EncodedAccessToken); + + SimpleSpec token(byte[] accessToken); + } + + interface JwtSpec { + + JwtSpec token(String jwtTokenString); + + JwtSpec token(byte[] jwtToken); + } + + class DefaultSpec implements Spec { + + BaseSpec baseSpec; + + DefaultSpec() { + if ((DefaultBuilderConfig.getAdditionalConnectionFlags() + & DestinationSetupFlyweight.FLAG_ALTERNATIVE_AUTHENTICATION) + == DestinationSetupFlyweight.FLAG_ALTERNATIVE_AUTHENTICATION) { + jwt(); + } else { + simple(); + } + } + + @Override + public JwtSpec jwt() { + DefaultJwtSpec jwtSpec = new DefaultJwtSpec(); + baseSpec = jwtSpec; + return jwtSpec; + } + + @Override + public SimpleSpec simple() { + DefaultSimpleSpec simpleSpec = new DefaultSimpleSpec(); + baseSpec = simpleSpec; + return simpleSpec; + } + } + + abstract class BaseSpec { + long accessKey; + byte[] accessToken; + + abstract AuthenticationConfig build(); + } + + class DefaultSimpleSpec extends BaseSpec implements SimpleSpec { + + DefaultSimpleSpec() { + Long key = DefaultBuilderConfig.getAccessKey(); + if (key != null) { + key(key); + } + + String token = DefaultBuilderConfig.getAccessToken(); + if (token != null) { + token(token); + } + } + + @Override + public SimpleSpec key(long accessKey) { + this.accessKey = accessKey; + return this; + } + + @Override + public SimpleSpec token(String bast64EncodedAccessToken) { + return token(Base64.getDecoder().decode(bast64EncodedAccessToken)); + } + + @Override + public SimpleSpec token(byte[] accessToken) { + this.accessToken = accessToken; + return this; + } + + @Override + AuthenticationConfig build() { + return AuthenticationConfig.simple(accessKey, accessToken); + } + } + + class DefaultJwtSpec extends BaseSpec implements JwtSpec { + + DefaultJwtSpec() { + token(DefaultBuilderConfig.getAccessToken()); + } + + @Override + public JwtSpec token(String bast64EncodedAccessToken) { + return token(Base64.getDecoder().decode(bast64EncodedAccessToken)); + } + + @Override + public JwtSpec token(byte[] accessToken) { + this.accessToken = accessToken; + return this; + } + + @Override + AuthenticationConfig build() { + return AuthenticationConfig.jwt(accessToken); + } + } + + final class JwtAuthenticationConfig implements AuthenticationConfig { + + private final byte[] jwtToken; + + JwtAuthenticationConfig(byte[] jwtToken) { + this.jwtToken = jwtToken; + } + + @Override + public long accessKey() { + return DestinationSetupFlyweight.JWT_AUTHENTICATION; + } + + @Override + public byte[] accessToken() { + return jwtToken; + } + } + } + + public interface DestinationInfoConfig { + String group(); + + Tags tags(); + + InetAddress inetAddress(); + + boolean isPublic(); + + static Builder builder() { + return new Builder(); + } + + class Builder { + + private String group = DefaultBuilderConfig.getGroup(); + private String destination = DefaultBuilderConfig.getDestination(); + private Tags tags = DefaultBuilderConfig.getTags(); + private InetAddress localINetAddress = DefaultBuilderConfig.getLocalAddress(); + private boolean isPublic = + (DefaultBuilderConfig.getAdditionalConnectionFlags() + & DestinationSetupFlyweight.FLAG_ENABLE_PUBLIC_ACCESS) + == DestinationSetupFlyweight.FLAG_ENABLE_PUBLIC_ACCESS; + + public Builder asPublicDestination() { + isPublic = true; + return this; + } + + public Builder asPrivateDestination() { + isPublic = false; + return this; + } + + public Builder groupName(String group) { + this.group = group; + return this; + } + + public Builder localAddress(InetAddress localINetAddress) { + this.localINetAddress = localINetAddress; + return this; + } + + public Builder destinationTag(String destination) { + this.destination = destination; + return this; + } + + public Builder tag(String key, String value) { + this.tags = tags.and(key, value); + return this; + } + + public Builder tags(String... tags) { + this.tags = this.tags.and(tags); + return this; + } + + public Builder tags(Iterable tags) { + this.tags = this.tags.and(tags); + return this; + } + + DestinationInfoConfig build() { + Tags tags; + if (destination != null && !destination.isEmpty()) { + tags = this.tags.and("com.netifi.destination", destination); + } else { + tags = this.tags; + } + + return new DestinationInfoConfig() { + @Override + public String group() { + return group; + } + + @Override + public Tags tags() { + return tags; + } + + @Override + public InetAddress inetAddress() { + return localINetAddress; + } + + @Override + public boolean isPublic() { + return isPublic; + } + }; + } + } + } + + public interface ConnectionConfig { + + Function clientTransportFactory(); + + Function brokerAddressSelector(); + + String connectionName(); + + static Builder builder() { + return new Builder(); + } + + static WebSocketBuilder ws() { + return new WebSocketBuilder(); + } + + static TcpBuilder tcp() { + return new TcpBuilder(); + } + + interface Spec { + TcpBuilder tcp(); + + WebSocketBuilder ws(); + + Builder custom(); + } + + class DefaultSpec implements Spec { + + BaseBuilder baseBuilder; + + @Override + public TcpBuilder tcp() { + TcpBuilder tcpBuilder = ConnectionConfig.tcp(); + if (DefaultBuilderConfig.isSslDisabled()) { + tcpBuilder.ssl(SslConfig.Spec::unsecured); + } else { + tcpBuilder.ssl(SslConfig.Spec::secured); + } + baseBuilder = tcpBuilder; + return tcpBuilder; + } + + @Override + public WebSocketBuilder ws() { + WebSocketBuilder webSocketBuilder = ConnectionConfig.ws(); + if (DefaultBuilderConfig.isSslDisabled()) { + webSocketBuilder.ssl(SslConfig.Spec::unsecured); + } else { + webSocketBuilder.ssl(SslConfig.Spec::secured); + } + baseBuilder = webSocketBuilder; + return webSocketBuilder; + } + + @Override + public Builder custom() { + Builder builder = ConnectionConfig.builder(); + baseBuilder = builder; + return builder; + } + } + + abstract class BaseBuilder> { + String connectionName; + + public BaseBuilder() { + connectionName = DefaultBuilderConfig.getConnectionId(); + + if (connectionName == null || connectionName.isEmpty()) { + connectionName = UUID.randomUUID().toString(); + } + } + + public SELF withConnectionName(String connectionName) { + this.connectionName = connectionName; + return (SELF) this; + } + + abstract ConnectionConfig build(); + } + + abstract class TcpBasedBuilder> extends BaseBuilder { + + Consumer specBuilder = __ -> {}; + + public SELF ssl(Consumer specBuilder) { + this.specBuilder = Objects.requireNonNull(specBuilder); + return (SELF) this; + } + + static ConnectionConfig tcpBased( + Function transportFactory, + Function brokerAddressSelector, + String connectionName, + Consumer sslBuilder) { + SslConfig.DefaultSpec spec = new SslConfig.DefaultSpec(); + sslBuilder.accept(spec); + SslConfig sslConfig = spec.sslConfig; + + if (sslConfig instanceof SslConfig.NoSslConfig) { + return new ConnectionConfig() { + @Override + public Function clientTransportFactory() { + return address -> { + TcpClient client = TcpClient.create().addressSupplier(() -> address); + return transportFactory.apply(client); + }; + } + + @Override + public Function brokerAddressSelector() { + return brokerAddressSelector; + } + + @Override + public String connectionName() { + return connectionName; + } + }; + + } else { + final SslContext sslContext = sslConfig.sslContextProvider().get(); + return new ConnectionConfig() { + @Override + public Function clientTransportFactory() { + return address -> { + TcpClient client = + TcpClient.create() + .addressSupplier(() -> address) + .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)); + return transportFactory.apply(client); + }; + } + + @Override + public Function brokerAddressSelector() { + return BrokerAddressSelectors.TCP_ADDRESS; + } + + @Override + public String connectionName() { + return connectionName; + } + }; + } + } + } + + class TcpBuilder extends TcpBasedBuilder { + + ConnectionConfig build() { + return tcpBased( + TcpClientTransport::create, + BrokerAddressSelectors.TCP_ADDRESS, + connectionName, + specBuilder); + } + } + + class WebSocketBuilder extends TcpBasedBuilder { + + ConnectionConfig build() { + return tcpBased( + WebsocketClientTransport::create, + BrokerAddressSelectors.WEBSOCKET_ADDRESS, + connectionName, + specBuilder); + } + } + + class Builder extends BaseBuilder { + Function clientTransportFactory; + Function brokerAddressSelector; + + public Builder withTransportFactory( + Function clientTransportFactory) { + this.clientTransportFactory = clientTransportFactory; + return this; + } + + public Builder withBrokerAddressSelector( + Function brokerAddressSelector) { + this.brokerAddressSelector = brokerAddressSelector; + return this; + } + + @Override + public ConnectionConfig build() { + return new ConnectionConfig() { + @Override + public Function clientTransportFactory() { + return clientTransportFactory; + } + + @Override + public Function brokerAddressSelector() { + return brokerAddressSelector; + } + + @Override + public String connectionName() { + return connectionName; + } + }; + } + } + } + + public static final class ClientBuilder { + + private AuthenticationConfig.DefaultSpec authenticationSpec = + new AuthenticationConfig.DefaultSpec(); + private DestinationInfoConfig.Builder destinationInfoConfigBuilder = + DestinationInfoConfig.builder(); + private ConnectionConfig.DefaultSpec connectionSpec = new ConnectionConfig.DefaultSpec(); + private KeepAliveConfig.DefaultSpec keepAliveConfigSpec = new KeepAliveConfig.DefaultSpec(); + private DiscoveryConfig.DefaultSpec discoveryConfigSpec = new DiscoveryConfig.DefaultSpec(); + private int poolSize = DefaultBuilderConfig.getPoolSize(); + private Tracer tracer; + + private Consumer authenticationBuilder = (__) -> {}; + private Consumer destinationInfoBuilder = (__) -> {}; + private Consumer keepAliveBuilder = (__) -> {}; + private Consumer connectionBuilder = (__) -> {}; + private Consumer discoveryBuilder = (__) -> {}; + + public ClientBuilder authentication(Consumer authenticationBuilder) { + this.authenticationBuilder = + this.authenticationBuilder.andThen(Objects.requireNonNull(authenticationBuilder)); + return this; + } + + public ClientBuilder destinationInfo( + Consumer destinationInfoBuilder) { + this.destinationInfoBuilder = + this.destinationInfoBuilder.andThen(Objects.requireNonNull(destinationInfoBuilder)); + return this; + } + + public ClientBuilder keepAlive(Consumer keepAliveBuilder) { + this.keepAliveBuilder = + this.keepAliveBuilder.andThen(Objects.requireNonNull(keepAliveBuilder)); + return this; + } + + public ClientBuilder connection(Consumer connectionBuilder) { + this.connectionBuilder = + this.connectionBuilder.andThen(Objects.requireNonNull(connectionBuilder)); + return this; + } + + public ClientBuilder discoveryStrategy(Consumer discoveryBuilder) { + this.discoveryBuilder = + this.discoveryBuilder.andThen(Objects.requireNonNull(discoveryBuilder)); + return this; + } + + public ClientBuilder poolSize(int poolSize) { + this.poolSize = poolSize; + return this; + } + + public ClientBuilder tracer(Tracer tracer) { + this.tracer = tracer; + return this; + } + + public BrokerService toService() { + return toService(new AbstractRSocket() {}, false); + } + + public RoutingBrokerService toRoutingService() { + return toRoutingService(new SimpleRouter()); + } + + public RoutingBrokerService toRoutingService(MutableRouter router) { + DefaultBrokerService brokerService = + (DefaultBrokerService) toService(new RoutingServerRSocket(router), true); + return new DefaultRoutingBrokerService(router, brokerService); + } + + public BrokerService toService(RSocket rSocket, boolean responderRequiresUnwrapping) { + destinationInfoBuilder.accept(destinationInfoConfigBuilder); + DestinationInfoConfig destinationInfoConfig = destinationInfoConfigBuilder.build(); + + connectionBuilder.accept(connectionSpec); + ConnectionConfig connectionConfig = connectionSpec.baseBuilder.build(); + + discoveryBuilder.accept(discoveryConfigSpec); + DiscoveryConfig discoveryConfig = discoveryConfigSpec.config; + + keepAliveBuilder.accept(keepAliveConfigSpec); + KeepAliveConfig keepAliveConfig = keepAliveConfigSpec.builder.build(); + + authenticationBuilder.accept(authenticationSpec); + AuthenticationConfig authenticationConfig = authenticationSpec.baseSpec.build(); + + short additionalFlags = DefaultBuilderConfig.getAdditionalConnectionFlags(); + + if (destinationInfoConfig.isPublic()) { + additionalFlags |= DestinationSetupFlyweight.FLAG_ENABLE_PUBLIC_ACCESS; + } + + if (authenticationConfig instanceof AuthenticationConfig.JwtAuthenticationConfig) { + additionalFlags |= DestinationSetupFlyweight.FLAG_ALTERNATIVE_AUTHENTICATION; + } + + return new DefaultBrokerService( + rSocket, + responderRequiresUnwrapping, + destinationInfoConfig.inetAddress(), + destinationInfoConfig.group(), + destinationInfoConfig.tags(), + connectionConfig.connectionName(), + connectionConfig.brokerAddressSelector(), + connectionConfig.clientTransportFactory(), + discoveryConfig.discoveryStrategy(), + keepAliveConfig instanceof KeepAliveConfig.NoKeepAliveConfig, + keepAliveConfig.tickPeriod(), + keepAliveConfig.acknowledgeTimeout(), + keepAliveConfig.missedAcknowledges(), + authenticationConfig.accessKey(), + Unpooled.wrappedBuffer(authenticationConfig.accessToken()), + additionalFlags, + poolSize, + tracer); + } + } +} diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/BrokerService.java b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerService.java index 4c80c209..38e83983 100644 --- a/netifi-broker-client/src/main/java/com/netifi/broker/BrokerService.java +++ b/netifi-broker-client/src/main/java/com/netifi/broker/BrokerService.java @@ -26,8 +26,14 @@ import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.util.ByteBufPayload; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; + +public interface BrokerService extends Disposable, InstanceInfoAware { + default BrokerSocket group(CharSequence group) { + return group(group, Tags.empty()); + } -interface BrokerService { default BrokerSocket group(CharSequence group, Tags tags) { return new DefaultBrokerSocket( payload -> { @@ -42,6 +48,10 @@ default BrokerSocket group(CharSequence group, Tags tags) { this::selectRSocket); } + default BrokerSocket broadcast(CharSequence group) { + return broadcast(group, Tags.empty()); + } + default BrokerSocket broadcast(CharSequence group, Tags tags) { return new DefaultBrokerSocket( payload -> { @@ -56,6 +66,10 @@ default BrokerSocket broadcast(CharSequence group, Tags tags) { this::selectRSocket); } + default BrokerSocket shard(CharSequence group, ByteBuf shardKey) { + return shard(group, shardKey, Tags.empty()); + } + default BrokerSocket shard(CharSequence group, ByteBuf shardKey, Tags tags) { return new DefaultBrokerSocket( payload -> { @@ -72,4 +86,6 @@ default BrokerSocket shard(CharSequence group, ByteBuf shardKey, Tags tags) { } RSocket selectRSocket(); + + Mono onClose(); } diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBrokerService.java b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBrokerService.java index 07da2c06..afc6731a 100644 --- a/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBrokerService.java +++ b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBrokerService.java @@ -38,7 +38,6 @@ import io.opentracing.Tracer; import io.rsocket.Payload; import io.rsocket.RSocket; -import io.rsocket.ResponderRSocket; import io.rsocket.transport.ClientTransport; import io.rsocket.util.ByteBufPayload; import java.net.InetAddress; @@ -60,7 +59,12 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -public class DefaultBrokerService implements BrokerService, Disposable { +public class DefaultBrokerService implements BrokerService, Disposable, InstanceInfoAware { + static { + // Set the Java DNS cache to 60 seconds + java.security.Security.setProperty("networkaddress.cache.ttl", "60"); + } + private static final Logger logger = LoggerFactory.getLogger(DefaultBrokerService.class); private static final double EXP_FACTOR = 4.0; private static final double DEFAULT_LOWER_QUANTILE = 0.5; @@ -77,8 +81,8 @@ public class DefaultBrokerService implements BrokerService, Disposable { private final InetAddress localInetAddress; private final String group; private final boolean keepalive; - private final long tickPeriodSeconds; - private final long ackTimeoutSeconds; + private final Duration tickPeriod; + private final Duration ackTimeout; private final int missedAcks; private final long accessKey; private final ByteBuf accessToken; @@ -100,37 +104,27 @@ public class DefaultBrokerService implements BrokerService, Disposable { private volatile Disposable disposable; public DefaultBrokerService( - List seedAddresses, - ResponderRSocket requestHandlingRSocket, + RSocket requestHandlingRSocket, boolean responderRequiresUnwrapping, InetAddress localInetAddress, String group, + Tags tags, + String connectionName, Function addressSelector, Function clientTransportFactory, - int poolSize, + DiscoveryStrategy discoveryStrategy, boolean keepalive, - long tickPeriodSeconds, - long ackTimeoutSeconds, + Duration tickPeriod, + Duration ackTimeout, int missedAcks, long accessKey, ByteBuf accessToken, - String connectionIdSeed, short additionalSetupFlags, - Tags tags, - Tracer tracer, - DiscoveryStrategy discoveryStrategy) { + int poolSize, + Tracer tracer) { this.discoveryStrategy = discoveryStrategy; - - if (discoveryStrategy == null) { - if (seedAddresses.isEmpty()) { - throw new IllegalStateException("seedAddress is empty"); - } else { - this.seedAddresses = seedAddresses; - } - } else { - this.seedAddresses = new CopyOnWriteArrayList<>(); - } + this.seedAddresses = new CopyOnWriteArrayList<>(); Objects.requireNonNull(accessToken); if (accessToken.readableBytes() == 0) { @@ -153,21 +147,19 @@ public DefaultBrokerService( this.selectRefreshTimeout = System.currentTimeMillis(); this.selectRefreshTimeoutDuration = 10_000; this.keepalive = keepalive; - this.tickPeriodSeconds = tickPeriodSeconds; - this.ackTimeoutSeconds = ackTimeoutSeconds; + this.tickPeriod = tickPeriod; + this.ackTimeout = ackTimeout; this.missedAcks = missedAcks; this.accessKey = accessKey; this.accessToken = accessToken; - this.connectionIdSeed = connectionIdSeed; + this.connectionIdSeed = connectionName; this.additionalSetupFlags = additionalSetupFlags; this.tags = tags; this.setupMetadata = new ArrayList<>(); this.onClose = MonoProcessor.create(); - if (discoveryStrategy != null) { - logger.info("discovery strategy found using " + discoveryStrategy.getClass()); - useDiscoveryStrategy(); - } + logger.info("discovery strategy found using " + discoveryStrategy.getClass()); + useDiscoveryStrategy(); this.client = new BrokerInfoServiceClient(group("com.netifi.broker.brokerServices", Tags.empty())); @@ -430,8 +422,8 @@ private WeightedReconnectingRSocket createWeightedReconnectingRSocket() { this::isDisposed, this::selectClientTransportSupplier, keepalive, - tickPeriodSeconds, - ackTimeoutSeconds, + tickPeriod, + ackTimeout, missedAcks, accessKey, accessToken, @@ -550,6 +542,23 @@ public RSocket selectRSocket() { return rSocket; } + public Tags tags() { + return tags; + } + + public long accessKey() { + return accessKey; + } + + public String groupName() { + return group; + } + + @Override + public Mono onClose() { + return onClose; + } + private static double algorithmicWeight( final WeightedRSocket socket, final Quantile lowerQuantile, final Quantile higherQuantile) { if (socket == null || socket.availability() == 0.0) { diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBuilderConfig.java b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBuilderConfig.java index 37bd0cba..5c3869b5 100644 --- a/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBuilderConfig.java +++ b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultBuilderConfig.java @@ -20,7 +20,6 @@ import com.typesafe.config.*; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -29,7 +28,8 @@ import java.util.stream.Stream; /** - * Gets current default configuration for {@link BrokerClient.Builder}. Can be overriden with System + * Gets current default configuration for {@link BrokerFactory}. Can be overridden with + * System * properties, or if the application provides a config file. The builder will over-ride these values * if they are based directly in to the builder. Otherwise it will these values a default. */ @@ -293,8 +293,8 @@ static boolean getExportSystemMetrics() { return exportSystemMetrics; } - static List getSeedAddress() { - List seedAddresses = null; + static List getSeedAddress() { + List seedAddresses = null; try { String s = conf.getString("netifi.client.seedAddresses"); if (s != null) { diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/DefaultRoutingBrokerService.java b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultRoutingBrokerService.java new file mode 100644 index 00000000..b2aee753 --- /dev/null +++ b/netifi-broker-client/src/main/java/com/netifi/broker/DefaultRoutingBrokerService.java @@ -0,0 +1,73 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netifi.broker; + +import com.netifi.common.tags.Tags; +import io.rsocket.RSocket; +import io.rsocket.ipc.MutableRouter; +import reactor.core.publisher.Mono; + +class DefaultRoutingBrokerService implements RoutingBrokerService, InstanceInfoAware { + + private final MutableRouter router; + private final DefaultBrokerService brokerService; + + public DefaultRoutingBrokerService(MutableRouter router, DefaultBrokerService brokerService) { + this.router = router; + this.brokerService = brokerService; + } + + @Override + public String groupName() { + return brokerService.groupName(); + } + + @Override + public Tags tags() { + return brokerService.tags(); + } + + @Override + public long accessKey() { + return brokerService.accessKey(); + } + + @Override + public MutableRouter router() { + return router; + } + + @Override + public RSocket selectRSocket() { + return brokerService.selectRSocket(); + } + + @Override + public Mono onClose() { + return brokerService.onClose(); + } + + @Override + public void dispose() { + brokerService.dispose(); + } + + @Override + public boolean isDisposed() { + return brokerService.isDisposed(); + } +} diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/InstanceInfoAware.java b/netifi-broker-client/src/main/java/com/netifi/broker/InstanceInfoAware.java new file mode 100644 index 00000000..67ccba66 --- /dev/null +++ b/netifi-broker-client/src/main/java/com/netifi/broker/InstanceInfoAware.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netifi.broker; + +import com.netifi.common.tags.Tags; + +public interface InstanceInfoAware { + + String groupName(); + + Tags tags(); + + long accessKey(); +} diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/RoutingBrokerService.java b/netifi-broker-client/src/main/java/com/netifi/broker/RoutingBrokerService.java new file mode 100644 index 00000000..f7357d35 --- /dev/null +++ b/netifi-broker-client/src/main/java/com/netifi/broker/RoutingBrokerService.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netifi.broker; + +import io.rsocket.Payload; +import io.rsocket.ipc.MutableRouter; +import io.rsocket.ipc.util.IPCChannelFunction; +import io.rsocket.ipc.util.IPCFunction; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface RoutingBrokerService> + extends BrokerService { + MutableRouter router(); + + default SELF withFireAndForgetHandler(String route, IPCFunction> handler) { + router().withFireAndForgetRoute(route, handler); + return (SELF) this; + } + + default SELF withRequestResponseHandler(String route, IPCFunction> handler) { + router().withRequestResponseRoute(route, handler); + return (SELF) this; + } + + default SELF withRequestStreamHandler(String route, IPCFunction> handler) { + router().withRequestStreamRoute(route, handler); + return (SELF) this; + } + + default SELF withRequestChannelHandler(String route, IPCChannelFunction handler) { + router().withRequestChannelRoute(route, handler); + return (SELF) this; + } +} diff --git a/netifi-broker-client/src/main/java/com/netifi/broker/rsocket/WeightedReconnectingRSocket.java b/netifi-broker-client/src/main/java/com/netifi/broker/rsocket/WeightedReconnectingRSocket.java index 013cd75f..90a24de0 100644 --- a/netifi-broker-client/src/main/java/com/netifi/broker/rsocket/WeightedReconnectingRSocket.java +++ b/netifi-broker-client/src/main/java/com/netifi/broker/rsocket/WeightedReconnectingRSocket.java @@ -62,8 +62,8 @@ public class WeightedReconnectingRSocket implements WeightedRSocket { private final Supplier setupPayloadSupplier; private final BooleanSupplier running; private final boolean keepalive; - private final long tickPeriodSeconds; - private final long ackTimeoutSeconds; + private final Duration tickPeriod; + private final Duration ackTimeout; private final int missedAcks; private final RSocket requestHandlingRSocket; private final long accessKey; @@ -91,8 +91,8 @@ public class WeightedReconnectingRSocket implements WeightedRSocket { final BooleanSupplier running, final Supplier transportSupplier, final boolean keepalive, - final long tickPeriodSeconds, - final long ackTimeoutSeconds, + final Duration tickPeriod, + final Duration ackTimeout, final int missedAcks, final long accessKey, final ByteBuf accessToken, @@ -121,8 +121,8 @@ public class WeightedReconnectingRSocket implements WeightedRSocket { this.keepalive = keepalive; this.accessKey = accessKey; this.accessToken = accessToken; - this.tickPeriodSeconds = tickPeriodSeconds; - this.ackTimeoutSeconds = ackTimeoutSeconds; + this.tickPeriod = tickPeriod; + this.ackTimeout = ackTimeout; this.missedAcks = missedAcks; } @@ -132,8 +132,8 @@ public static WeightedReconnectingRSocket newInstance( final BooleanSupplier running, final Supplier transportSupplier, final boolean keepalive, - final long tickPeriodSeconds, - final long ackTimeoutSeconds, + final Duration tickPeriod, + final Duration ackTimeout, final int missedAcks, final long accessKey, final ByteBuf accessToken, @@ -147,8 +147,8 @@ public static WeightedReconnectingRSocket newInstance( running, transportSupplier, keepalive, - tickPeriodSeconds, - ackTimeoutSeconds, + tickPeriod, + ackTimeout, missedAcks, accessKey, accessToken, @@ -197,18 +197,9 @@ private ClientRSocketFactory getClientFactory() { ClientRSocketFactory connect = RSocketFactory.connect().frameDecoder(PayloadDecoder.ZERO_COPY); if (keepalive) { - connect = - connect - .keepAlive() - .keepAliveTickPeriod(Duration.ofSeconds(tickPeriodSeconds)) - .keepAliveAckTimeout(Duration.ofSeconds(ackTimeoutSeconds)) - .keepAliveMissedAcks(missedAcks); + connect = connect.keepAlive(tickPeriod, ackTimeout, missedAcks); } else { - connect - .keepAlive() - .keepAliveTickPeriod(Duration.ofSeconds(0)) - .keepAliveAckTimeout(Duration.ofSeconds(0)) - .keepAliveMissedAcks(missedAcks); + connect = connect.keepAlive(Duration.ofSeconds(0), Duration.ofSeconds(0), missedAcks); } return connect.setupPayload(setupPayloadSupplier.get()); @@ -635,10 +626,10 @@ public String toString() { + running + ", keepalive=" + keepalive - + ", tickPeriodSeconds=" - + tickPeriodSeconds - + ", ackTimeoutSeconds=" - + ackTimeoutSeconds + + ", tickPeriodMillis=" + + tickPeriod.toMillis() + + ", ackTimeoutMillis=" + + ackTimeout.toMillis() + ", missedAcks=" + missedAcks + ", accessKey=" diff --git a/netifi-broker-client/src/test/java/com/netifi/broker/DefaultBuilderConfigTest.java b/netifi-broker-client/src/test/java/com/netifi/broker/DefaultBuilderConfigTest.java index 34fdae54..a7858e80 100644 --- a/netifi-broker-client/src/test/java/com/netifi/broker/DefaultBuilderConfigTest.java +++ b/netifi-broker-client/src/test/java/com/netifi/broker/DefaultBuilderConfigTest.java @@ -18,7 +18,6 @@ import com.netifi.common.tags.Tag; import com.netifi.common.tags.Tags; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.util.List; import java.util.Optional; import org.junit.Assert; @@ -31,7 +30,7 @@ public class DefaultBuilderConfigTest { @Test public void testShouldFindSingleSeedAddress() { System.setProperty("netifi.client.seedAddresses", "localhost:8001"); - List seedAddress = DefaultBuilderConfig.getSeedAddress(); + List seedAddress = DefaultBuilderConfig.getSeedAddress(); Assert.assertNotNull(seedAddress); Assert.assertEquals(1, seedAddress.size()); @@ -43,7 +42,7 @@ public void testShouldFindSingleSeedAddress() { public void testShouldFindMultipleSeedAddresses() { System.setProperty( "netifi.client.seedAddresses", "localhost:8001,localhost:8002,localhost:8003"); - List seedAddress = DefaultBuilderConfig.getSeedAddress(); + List seedAddress = DefaultBuilderConfig.getSeedAddress(); Assert.assertNotNull(seedAddress); Assert.assertEquals(3, seedAddress.size()); @@ -54,13 +53,13 @@ public void testShouldFindMultipleSeedAddresses() { @Test(expected = IllegalStateException.class) public void testShouldThrowExceptionForAddressMissingPort() { System.setProperty("netifi.client.seedAddresses", "localhost:8001,localhost,localhost:8003"); - List seedAddress = DefaultBuilderConfig.getSeedAddress(); + List seedAddress = DefaultBuilderConfig.getSeedAddress(); } @Test(expected = IllegalStateException.class) public void testShouldThrowExceptionForInvalidAddress() { System.setProperty("netifi.client.seedAddresses", "no way im valid"); - List seedAddress = DefaultBuilderConfig.getSeedAddress(); + List seedAddress = DefaultBuilderConfig.getSeedAddress(); } @Test @@ -75,7 +74,7 @@ public void testShouldParseTagsSuccessfully() { @Test public void testShouldReturnNull() { - List seedAddress = DefaultBuilderConfig.getSeedAddress(); + List seedAddress = DefaultBuilderConfig.getSeedAddress(); Assert.assertNull(seedAddress); } diff --git a/netifi-broker-client/src/test/java/com/netifi/broker/integration/BrokerClientIntegrationTest.java b/netifi-broker-client/src/test/java/com/netifi/broker/integration/BrokerClientIntegrationTest.java index fb8ef979..c3f5db40 100644 --- a/netifi-broker-client/src/test/java/com/netifi/broker/integration/BrokerClientIntegrationTest.java +++ b/netifi-broker-client/src/test/java/com/netifi/broker/integration/BrokerClientIntegrationTest.java @@ -16,14 +16,14 @@ package com.netifi.broker.integration; import com.google.protobuf.Empty; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerFactory; +import com.netifi.broker.RoutingBrokerService; import com.netifi.broker.rsocket.BrokerSocket; import com.netifi.broker.testing.protobuf.SimpleRequest; import com.netifi.broker.testing.protobuf.SimpleResponse; import com.netifi.broker.testing.protobuf.SimpleService; import com.netifi.broker.testing.protobuf.SimpleServiceClient; import com.netifi.broker.testing.protobuf.SimpleServiceServer; -import com.netifi.common.tags.Tags; import io.netty.buffer.ByteBuf; import java.time.Duration; import java.util.Optional; @@ -48,39 +48,36 @@ public class BrokerClientIntegrationTest { private static final String host = "localhost"; private static final int port = 8001; private static final int server_port = 8001; - private static BrokerClient server; - private static BrokerClient brokerClient; + private static RoutingBrokerService server; + private static RoutingBrokerService brokerClient; private static BrokerSocket brokerSocket; @BeforeClass public static void setup() { server = - BrokerClient.tcp() - .keepalive(false) - .group("test.server") - .destination("server") - .accessKey(accessKey) - .accessToken(accessToken) - .host(host) - .port(server_port) - .build(); + BrokerFactory.connect() + .destinationInfo(spec -> spec.groupName("test.server").destinationTag("server")) + .keepAlive(spec -> spec.noKeepAlive()) + .authentication(spec -> spec.simple().key(accessKey).token(accessToken)) + .connection(spec -> spec.tcp()) + .discoveryStrategy(spec -> spec.simple(port, host)) + .toRoutingService(); brokerClient = - BrokerClient.tcp() - .keepalive(false) - .group("test.brokerClient") - .destination("brokerClient") - .accessKey(accessKey) - .accessToken(accessToken) - .host(host) - .port(port) - .build(); - - server.addService( - new SimpleServiceServer( - new DefaultSimpleService(), Optional.empty(), Optional.empty(), Optional.empty())); - - brokerSocket = brokerClient.groupServiceSocket("test.server", Tags.empty()); + BrokerFactory.connect() + .keepAlive(spec -> spec.noKeepAlive()) + .connection(spec -> spec.tcp()) + .destinationInfo( + spec -> spec.groupName("test.brokerClient").destinationTag("brokerClient")) + .authentication(spec -> spec.simple().key(accessKey).token(accessToken)) + .discoveryStrategy(spec -> spec.simple(port, host)) + .toRoutingService(); + + new SimpleServiceServer( + new DefaultSimpleService(), Optional.empty(), Optional.empty(), Optional.empty()) + .selfRegister(server.router()); + + brokerSocket = brokerClient.group("test.server"); } @Test diff --git a/netifi-broker-client/src/test/java/com/netifi/broker/rsocket/WeightedReconnectingRSocketTest.java b/netifi-broker-client/src/test/java/com/netifi/broker/rsocket/WeightedReconnectingRSocketTest.java index b9af9263..f8bc978e 100644 --- a/netifi-broker-client/src/test/java/com/netifi/broker/rsocket/WeightedReconnectingRSocketTest.java +++ b/netifi-broker-client/src/test/java/com/netifi/broker/rsocket/WeightedReconnectingRSocketTest.java @@ -39,8 +39,8 @@ public void testShouldWaitForSocketWhenNotPresent() { () -> true, Mockito.mock(Supplier.class), false, - 0, - 0, + Duration.ZERO, + Duration.ZERO, 0, 0, Unpooled.EMPTY_BUFFER, @@ -66,8 +66,8 @@ public void testShouldSetRSocketAndReturnSocket() { () -> true, Mockito.mock(Supplier.class), false, - 0, - 0, + Duration.ZERO, + Duration.ZERO, 0, 0, Unpooled.EMPTY_BUFFER, @@ -109,8 +109,8 @@ public void testShouldEmitNewRSocketAfterSubscribing() throws Exception { () -> true, Mockito.mock(Supplier.class), false, - 0, - 0, + Duration.ZERO, + Duration.ZERO, 0, 0, Unpooled.EMPTY_BUFFER, @@ -140,8 +140,8 @@ public void testShouldWaitAfterRSocketCloses() { () -> true, Mockito.mock(Supplier.class), false, - 0, - 0, + Duration.ZERO, + Duration.ZERO, 0, 0, Unpooled.EMPTY_BUFFER, diff --git a/netifi-broker-info-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged b/netifi-broker-info-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged deleted file mode 100644 index 597c5914..00000000 --- a/netifi-broker-info-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged +++ /dev/null @@ -1,35 +0,0 @@ -# This is a Gradle generated file for dependency locking. -# Manual edits can break the build and are not advised. -# This file is expected to be part of source control. -com.google.protobuf:protobuf-java:3.6.1 -io.netty:netty-buffer:4.1.31.Final -io.netty:netty-codec-http2:4.1.31.Final -io.netty:netty-codec-http:4.1.31.Final -io.netty:netty-codec-socks:4.1.31.Final -io.netty:netty-codec:4.1.31.Final -io.netty:netty-common:4.1.31.Final -io.netty:netty-handler-proxy:4.1.31.Final -io.netty:netty-handler:4.1.31.Final -io.netty:netty-resolver:4.1.31.Final -io.netty:netty-transport-native-epoll:4.1.31.Final -io.netty:netty-transport-native-unix-common:4.1.31.Final -io.netty:netty-transport:4.1.31.Final -io.projectreactor.netty:reactor-netty:0.8.5.RELEASE -io.projectreactor:reactor-core:3.2.6.RELEASE -io.projectreactor:reactor-test:3.2.6.RELEASE -io.rsocket:rsocket-core:0.11.17.2 -io.rsocket:rsocket-transport-local:0.11.17.2 -io.rsocket:rsocket-transport-netty:0.11.17.2 -javax.inject:javax.inject:1 -junit:junit:4.12 -net.bytebuddy:byte-buddy-agent:1.9.7 -net.bytebuddy:byte-buddy:1.9.7 -org.apache.logging.log4j:log4j-api:2.11.2 -org.apache.logging.log4j:log4j-core:2.11.2 -org.apache.logging.log4j:log4j-slf4j-impl:2.11.2 -org.hamcrest:hamcrest-core:1.3 -org.hdrhistogram:HdrHistogram:2.1.10 -org.mockito:mockito-core:2.25.0 -org.objenesis:objenesis:2.6 -org.reactivestreams:reactive-streams:1.0.2 -org.slf4j:slf4j-api:1.7.25 diff --git a/netifi-broker-mgmt-idl/gradle/dependency-locks/protobuf.lockfile~merged b/netifi-broker-mgmt-idl/gradle/dependency-locks/protobuf.lockfile~merged deleted file mode 100644 index 656c5dbc..00000000 --- a/netifi-broker-mgmt-idl/gradle/dependency-locks/protobuf.lockfile~merged +++ /dev/null @@ -1,3 +0,0 @@ -# This is a Gradle generated file for dependency locking. -# Manual edits can break the build and are not advised. -# This file is expected to be part of source control. diff --git a/netifi-broker-mgmt-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged b/netifi-broker-mgmt-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged deleted file mode 100644 index 597c5914..00000000 --- a/netifi-broker-mgmt-idl/gradle/dependency-locks/testCompileClasspath.lockfile~merged +++ /dev/null @@ -1,35 +0,0 @@ -# This is a Gradle generated file for dependency locking. -# Manual edits can break the build and are not advised. -# This file is expected to be part of source control. -com.google.protobuf:protobuf-java:3.6.1 -io.netty:netty-buffer:4.1.31.Final -io.netty:netty-codec-http2:4.1.31.Final -io.netty:netty-codec-http:4.1.31.Final -io.netty:netty-codec-socks:4.1.31.Final -io.netty:netty-codec:4.1.31.Final -io.netty:netty-common:4.1.31.Final -io.netty:netty-handler-proxy:4.1.31.Final -io.netty:netty-handler:4.1.31.Final -io.netty:netty-resolver:4.1.31.Final -io.netty:netty-transport-native-epoll:4.1.31.Final -io.netty:netty-transport-native-unix-common:4.1.31.Final -io.netty:netty-transport:4.1.31.Final -io.projectreactor.netty:reactor-netty:0.8.5.RELEASE -io.projectreactor:reactor-core:3.2.6.RELEASE -io.projectreactor:reactor-test:3.2.6.RELEASE -io.rsocket:rsocket-core:0.11.17.2 -io.rsocket:rsocket-transport-local:0.11.17.2 -io.rsocket:rsocket-transport-netty:0.11.17.2 -javax.inject:javax.inject:1 -junit:junit:4.12 -net.bytebuddy:byte-buddy-agent:1.9.7 -net.bytebuddy:byte-buddy:1.9.7 -org.apache.logging.log4j:log4j-api:2.11.2 -org.apache.logging.log4j:log4j-core:2.11.2 -org.apache.logging.log4j:log4j-slf4j-impl:2.11.2 -org.hamcrest:hamcrest-core:1.3 -org.hdrhistogram:HdrHistogram:2.1.10 -org.mockito:mockito-core:2.25.0 -org.objenesis:objenesis:2.6 -org.reactivestreams:reactive-streams:1.0.2 -org.slf4j:slf4j-api:1.7.25 diff --git a/netifi-common/build.gradle b/netifi-common/build.gradle index cb208183..f1e92f54 100644 --- a/netifi-common/build.gradle +++ b/netifi-common/build.gradle @@ -11,7 +11,7 @@ dependencies { testCompile 'junit:junit' testCompile 'javax.inject:javax.inject' testCompile 'io.projectreactor:reactor-test' - testCompile "com.google.protobuf:protobuf-java" + testCompile 'com.google.protobuf:protobuf-java' testCompile 'org.hdrhistogram:HdrHistogram' testCompile 'org.apache.logging.log4j:log4j-api' testCompile 'org.apache.logging.log4j:log4j-core' diff --git a/netifi-metrics-influx/src/main/java/com/netifi/broker/influx/BrokerInfluxBridge.java b/netifi-metrics-influx/src/main/java/com/netifi/broker/influx/BrokerInfluxBridge.java index 92b4720f..699aab07 100644 --- a/netifi-metrics-influx/src/main/java/com/netifi/broker/influx/BrokerInfluxBridge.java +++ b/netifi-metrics-influx/src/main/java/com/netifi/broker/influx/BrokerInfluxBridge.java @@ -16,13 +16,25 @@ package com.netifi.broker.influx; import com.google.common.util.concurrent.AtomicDouble; -import com.netifi.broker.BrokerClient; -import io.micrometer.core.instrument.*; +import com.netifi.broker.BrokerFactory; +import com.netifi.broker.RoutingBrokerService; +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; import io.micrometer.influx.InfluxConfig; import io.micrometer.influx.InfluxMeterRegistry; import io.netty.buffer.ByteBuf; -import io.rsocket.rpc.metrics.om.*; +import io.rsocket.rpc.metrics.om.MeterId; +import io.rsocket.rpc.metrics.om.MeterMeasurement; +import io.rsocket.rpc.metrics.om.MeterTag; +import io.rsocket.rpc.metrics.om.MeterType; +import io.rsocket.rpc.metrics.om.MetricsSnapshot; +import io.rsocket.rpc.metrics.om.MetricsSnapshotHandler; +import io.rsocket.rpc.metrics.om.MetricsSnapshotHandlerServer; +import io.rsocket.rpc.metrics.om.Skew; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -73,15 +85,13 @@ public static void main(String... args) { logger.info("broker port - {}", brokerPort); logger.info("access key - {}", accessKey); - BrokerClient brokerClient = - BrokerClient.tcp() - .accessKey(accessKey) - .accessToken(accessToken) - .group(group) - .host(brokerHost) - .port(brokerPort) - .destination("standaloneInfluxBridge") - .build(); + RoutingBrokerService brokerClient = + BrokerFactory.connect() + .connection(spec -> spec.tcp()) + .authentication(spec -> spec.simple().key(accessKey).token(accessToken)) + .destinationInfo(spec -> spec.groupName(group).destinationTag("standaloneInfluxBridge")) + .discoveryStrategy(spec -> spec.simple(brokerPort, brokerHost)) + .toRoutingService(); InfluxConfig config = new InfluxConfig() { @@ -116,8 +126,7 @@ public String retentionDuration() { }; AtomicLong influxThreadCount = new AtomicLong(); - brokerClient.addService( - new MetricsSnapshotHandlerServer( + new MetricsSnapshotHandlerServer( new BrokerInfluxBridge( Optional.empty(), new InfluxMeterRegistry( @@ -131,7 +140,8 @@ public String retentionDuration() { })), Optional.empty(), Optional.empty(), - Optional.empty())); + Optional.empty()) + .selfRegister(brokerClient.router()); brokerClient.onClose().block(); } diff --git a/netifi-metrics-micrometer/src/main/java/com/netifi/broker/micrometer/BrokerMeterRegistrySupplier.java b/netifi-metrics-micrometer/src/main/java/com/netifi/broker/micrometer/BrokerMeterRegistrySupplier.java index 36e90804..92666ce6 100644 --- a/netifi-metrics-micrometer/src/main/java/com/netifi/broker/micrometer/BrokerMeterRegistrySupplier.java +++ b/netifi-metrics-micrometer/src/main/java/com/netifi/broker/micrometer/BrokerMeterRegistrySupplier.java @@ -16,7 +16,7 @@ package com.netifi.broker.micrometer; import com.netflix.spectator.atlas.AtlasConfig; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; import io.micrometer.atlas.AtlasMeterRegistry; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -37,13 +37,12 @@ public class BrokerMeterRegistrySupplier implements Supplier { @Inject public BrokerMeterRegistrySupplier( - BrokerClient netifi, + BrokerService netifi, Optional metricsGroup, Optional stepInMillis, Optional export) { Objects.requireNonNull(netifi, "must provide a BrokerClient instance"); - BrokerSocket brokerSocket = netifi.groupServiceSocket(metricsGroup.orElse("com.netifi.broker.metrics"), com.netifi.common.tags.Tags - .empty()); + BrokerSocket brokerSocket = netifi.group(metricsGroup.orElse("com.netifi.broker.metrics")); MetricsSnapshotHandlerClient client = new MetricsSnapshotHandlerClient(brokerSocket); @@ -70,8 +69,7 @@ public Duration step() { }); List tags = - netifi - .getTags() + netifi.tags() .stream() .map(tag -> Tag.of(tag.getKey(), tag.getValue())) .collect(Collectors.toList()); @@ -79,8 +77,8 @@ public Duration step() { .config() .commonTags( Tags.of( - "accessKey", String.valueOf(netifi.getAccesskey()), - "group", netifi.getGroupName()) + "accessKey", String.valueOf(netifi.accessKey()), + "group", netifi.groupName()) .and(tags)); new BrokerOperatingSystemMetrics(registry, Collections.EMPTY_LIST); diff --git a/netifi-metrics-prometheus/src/main/java/com/netifi/broker/prometheus/BrokerPrometheusBridge.java b/netifi-metrics-prometheus/src/main/java/com/netifi/broker/prometheus/BrokerPrometheusBridge.java index 9437942d..175f6359 100644 --- a/netifi-metrics-prometheus/src/main/java/com/netifi/broker/prometheus/BrokerPrometheusBridge.java +++ b/netifi-metrics-prometheus/src/main/java/com/netifi/broker/prometheus/BrokerPrometheusBridge.java @@ -15,16 +15,28 @@ */ package com.netifi.broker.prometheus; -import com.netifi.broker.BrokerClient; -import io.micrometer.core.instrument.*; +import com.netifi.broker.BrokerFactory; +import com.netifi.broker.RoutingBrokerService; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpResponseStatus; import io.prometheus.client.exporter.common.TextFormat; -import io.rsocket.rpc.metrics.om.*; +import io.rsocket.rpc.metrics.om.MeterId; +import io.rsocket.rpc.metrics.om.MeterMeasurement; +import io.rsocket.rpc.metrics.om.MeterTag; +import io.rsocket.rpc.metrics.om.MeterType; +import io.rsocket.rpc.metrics.om.MetricsSnapshot; +import io.rsocket.rpc.metrics.om.MetricsSnapshotHandler; +import io.rsocket.rpc.metrics.om.MetricsSnapshotHandlerServer; +import io.rsocket.rpc.metrics.om.Skew; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -86,18 +98,16 @@ public static void main(String... args) { logger.info("broker port - {}", brokerPort); logger.info("access key - {}", accessKey); - BrokerClient brokerClient = - BrokerClient.tcp() - .accessKey(accessKey) - .accessToken(accessToken) - .group(group) - .host(brokerHost) - .port(brokerPort) - .destination("standalonePrometheusBridge") - .build(); + RoutingBrokerService brokerClient = + BrokerFactory.connect() + .connection(spec -> spec.tcp()) + .authentication(spec -> spec.simple().key(accessKey).token(accessToken)) + .destinationInfo( + spec -> spec.groupName(group).destinationTag("standalonePrometheusBridge")) + .discoveryStrategy(spec -> spec.simple(brokerPort, brokerHost)) + .toRoutingService(); - brokerClient.addService( - new MetricsSnapshotHandlerServer( + new MetricsSnapshotHandlerServer( new BrokerPrometheusBridge( Optional.empty(), new PrometheusMeterRegistry(PrometheusConfig.DEFAULT), @@ -106,7 +116,8 @@ public static void main(String... args) { Optional.ofNullable(metricsUrl)), Optional.empty(), Optional.empty(), - Optional.empty())); + Optional.empty()) + .selfRegister(brokerClient.router()); brokerClient.onClose().block(); } diff --git a/netifi-spring-boot-autoconfigure/build.gradle b/netifi-spring-boot-autoconfigure/build.gradle index c5a825e1..ac150cda 100644 --- a/netifi-spring-boot-autoconfigure/build.gradle +++ b/netifi-spring-boot-autoconfigure/build.gradle @@ -4,12 +4,6 @@ plugins { description = 'Netifi Spring Boot Autoconfigure' -dependencyManagement { - imports { - mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootDependenciesVersion}" - } -} - dependencies { compile project(':netifi-spring-core') compile project(':netifi-spring-messaging') diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientAutoConfiguration.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientAutoConfiguration.java index f8ad09b6..5626de9c 100644 --- a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientAutoConfiguration.java +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientAutoConfiguration.java @@ -16,23 +16,23 @@ package com.netifi.spring.boot; import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerFactory; +import com.netifi.broker.BrokerService; +import com.netifi.broker.RoutingBrokerService; import com.netifi.broker.discovery.*; import com.netifi.broker.micrometer.BrokerMeterRegistrySupplier; -import com.netifi.broker.rsocket.transport.BrokerAddressSelectors; import com.netifi.broker.tracing.BrokerTracerSupplier; import com.netifi.common.tags.Tag; import com.netifi.common.tags.Tags; -import com.netifi.spring.boot.support.BrokerClientConfigurer; +import com.netifi.spring.boot.support.BrokerServiceConfigurer; import com.netifi.spring.core.BrokerClientTagSupplier; import com.netifi.spring.core.config.BrokerClientConfiguration; import io.micrometer.core.instrument.MeterRegistry; -import io.netty.handler.ssl.OpenSsl; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.SslProvider; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.opentracing.Tracer; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.ipc.MutableRouter; +import io.rsocket.ipc.Router; +import io.rsocket.ipc.routing.SimpleRouter; +import java.time.Duration; import java.util.List; import java.util.Optional; import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor; @@ -52,24 +52,23 @@ import org.springframework.core.annotation.AnnotationAwareOrderComparator; import org.springframework.core.annotation.Order; import org.springframework.util.StringUtils; -import reactor.core.Exceptions; -import reactor.netty.tcp.TcpClient; @Configuration @EnableConfigurationProperties(BrokerClientProperties.class) @AutoConfigureBefore(BrokerClientConfiguration.class) public class BrokerClientAutoConfiguration { - static BrokerClient configureBrokerClient(List configurers) { - BrokerClient.CustomizableBuilder builder = BrokerClient.customizable(); + static RoutingBrokerService configureBrokerClient( + MutableRouter router, List configurers) { + BrokerFactory.ClientBuilder clientBuilder = BrokerFactory.connect(); AnnotationAwareOrderComparator.sort(configurers); - for (BrokerClientConfigurer configurer : configurers) { - builder = configurer.configure(builder); + for (BrokerServiceConfigurer configurer : configurers) { + configurer.configure(clientBuilder); } - return builder.build(); + return clientBuilder.toRoutingService(router); } @Bean(name = "internalScanClassPathBeanDefinitionRegistryPostProcessor") @@ -79,7 +78,7 @@ public BeanDefinitionRegistryPostProcessor scanClassPathBeanDefinitionRegistryPo @Bean @Order(Ordered.HIGHEST_PRECEDENCE) - public BrokerClientConfigurer propertiesBasedBrokerClientConfigurer( + public BrokerServiceConfigurer propertiesBasedBrokerClientConfigurer( BrokerClientTagSupplier brokerClientTagSupplier, BrokerClientProperties brokerClientProperties) { return builder -> { @@ -88,23 +87,19 @@ public BrokerClientConfigurer propertiesBasedBrokerClientConfigurer( BrokerClientProperties.BrokerProperties broker = brokerClientProperties.getBroker(); BrokerClientProperties.DiscoveryProperties discovery = brokerClientProperties.getDiscovery(); BrokerClientProperties.KeepAliveProperties keepalive = brokerClientProperties.getKeepalive(); - - if (!StringUtils.isEmpty(brokerClientProperties.getDestination())) { - builder.destination(brokerClientProperties.getDestination()); - } - BrokerClientProperties.ConnectionType connectionType; + DiscoveryStrategy discoveryStrategy; if (!StringUtils.isEmpty(broker.getHostname())) { // support the legacy usecase first - builder.host(broker.getHostname()); - builder.port(broker.getPort()); + discoveryStrategy = + new StaticListDiscoveryStrategy( + new StaticListDiscoveryConfig(broker.getPort(), broker.getHostname())); connectionType = broker.getConnectionType(); } else if (!StringUtils.isEmpty(discovery.getEnvironment())) { // if not legacy, then we're propbably using the new discovery api. - DiscoveryStrategy discoveryStrategy; switch (discovery.getEnvironment()) { case "static": BrokerClientProperties.DiscoveryProperties.StaticProperties staticProperties = @@ -150,120 +145,97 @@ public BrokerClientConfigurer propertiesBasedBrokerClientConfigurer( throw new RuntimeException( "unsupported discovery strategy " + discovery.getEnvironment()); } - builder.discoveryStrategy(discoveryStrategy); + } else { throw new RuntimeException("discovery not configured and required"); } - - Tags tags = Tags.empty(); - if (brokerClientProperties.getTags() != null && !brokerClientProperties.getTags().isEmpty()) { - for (String t : brokerClientProperties.getTags()) { - String[] split = t.split(":"); - Tag tag = Tag.of(split[0], split[1]); - tags = tags.and(tag); - } - } - - Tags suppliedTags = brokerClientTagSupplier.get(); - - if (suppliedTags != null) { - tags = tags.and(suppliedTags); - } - - boolean sslDisabled = ssl.isDisabled(); - - if (connectionType == BrokerClientProperties.ConnectionType.TCP) { - builder.addressSelector(BrokerAddressSelectors.TCP_ADDRESS); - builder.clientTransportFactory( - address -> { - if (sslDisabled) { - TcpClient client = TcpClient.create().addressSupplier(() -> address); - return TcpClientTransport.create(client); - } else { - TcpClient client = - TcpClient.create() - .addressSupplier(() -> address) - .secure( - spec -> { - final SslProvider sslProvider; - if (OpenSsl.isAvailable()) { - sslProvider = SslProvider.OPENSSL_REFCNT; - } else { - sslProvider = SslProvider.JDK; - } - - try { - spec.sslContext( - SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .sslProvider(sslProvider) - .build()); - } catch (Exception sslException) { - throw Exceptions.propagate(sslException); - } - }); - - return TcpClientTransport.create(client); - } - }); - } else if (connectionType == BrokerClientProperties.ConnectionType.WS) { - builder.addressSelector(BrokerAddressSelectors.WEBSOCKET_ADDRESS); - builder.tags(tags); - builder.clientTransportFactory( - address -> { - if (sslDisabled) { - TcpClient client = TcpClient.create().addressSupplier(() -> address); - return WebsocketClientTransport.create(client); - } else { - TcpClient client = - TcpClient.create() - .addressSupplier(() -> address) - .secure( - spec -> { - final SslProvider sslProvider; - if (OpenSsl.isAvailable()) { - sslProvider = SslProvider.OPENSSL_REFCNT; - } else { - sslProvider = SslProvider.JDK; - } - - try { - spec.sslContext( - SslContextBuilder.forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .sslProvider(sslProvider) - .build()); - } catch (Exception sslException) { - throw Exceptions.propagate(sslException); - } - }); - return WebsocketClientTransport.create(client); - } - }); - } - - return builder - .keepalive(keepalive.isEnabled()) - .ackTimeoutSeconds(keepalive.getAckTimeoutSeconds()) - .tickPeriodSeconds(keepalive.getTickPeriodSeconds()) - .missedAcks(keepalive.getMissedAcks()) - .accessKey(access.getKey()) - .accessToken(access.getToken()) - .group(brokerClientProperties.getGroup()) - .isPublic(brokerClientProperties.isPublic()) + builder + .discoveryStrategy(spec -> spec.custom(discoveryStrategy)) + .connection( + spec -> { + boolean sslDisabled = ssl.isDisabled(); + BrokerFactory.ConnectionConfig.TcpBasedBuilder tcpSpec = null; + switch (connectionType) { + case TCP: + tcpSpec = spec.tcp(); + break; + case WS: + tcpSpec = spec.ws(); + break; + } + + if (tcpSpec != null) { + tcpSpec.ssl( + sslSpec -> { + if (sslDisabled) { + sslSpec.unsecured(); + } else { + sslSpec.secured(); + } + }); + } + }) + .destinationInfo( + spec -> { + if (!StringUtils.isEmpty(brokerClientProperties.getDestination())) { + spec.destinationTag(brokerClientProperties.getDestination()); + } + + Tags tags = Tags.empty(); + if (brokerClientProperties.getTags() != null + && !brokerClientProperties.getTags().isEmpty()) { + for (String t : brokerClientProperties.getTags()) { + String[] split = t.split(":"); + Tag tag = Tag.of(split[0], split[1]); + tags = tags.and(tag); + } + } + + Tags suppliedTags = brokerClientTagSupplier.get(); + + if (suppliedTags != null) { + tags = tags.and(suppliedTags); + } + if (brokerClientProperties.isPublic()) { + spec.asPublicDestination(); + } else { + spec.asPrivateDestination(); + } + + spec.tags(tags).groupName(brokerClientProperties.getGroup()); + }) + .keepAlive( + spec -> { + if (keepalive.isEnabled()) { + spec.configure() + .acknowledgeTimeout(Duration.ofSeconds(keepalive.getAckTimeoutSeconds())) + .tickPeriod(Duration.ofSeconds(keepalive.getTickPeriodSeconds())) + .missedAcknowledges(keepalive.getMissedAcks()); + } + }) + .authentication(spec -> spec.simple().token(access.getToken()).key(access.getKey())) .poolSize(brokerClientProperties.getPoolSize()); }; } @Configuration @ConditionalOnMissingBean(BrokerClientTagSupplier.class) - public static class BrokerTagSupplierConfiguations { + public static class BrokerTagSupplierConfigurations { @Bean public BrokerClientTagSupplier brokerClientTagSupplier() { return Tags::empty; } } + @Configuration + @ConditionalOnMissingBean(Router.class) + public static class RouterConfiguration { + @Bean + public MutableRouter mutableRouter() { + return new SimpleRouter(); + } + } + @Configuration @ConditionalOnMissingBean(MeterRegistry.class) @ConditionalOnClass(BrokerMeterRegistrySupplier.class) @@ -271,7 +243,7 @@ public static class MetricsConfigurations { @Bean public MeterRegistry meterRegistry( - BrokerClient brokerClient, BrokerClientProperties properties) { + BrokerService brokerClient, BrokerClientProperties properties) { return new BrokerMeterRegistrySupplier( brokerClient, Optional.of(properties.getMetrics().getGroup()), @@ -287,7 +259,7 @@ public MeterRegistry meterRegistry( public static class TracingConfigurations { @Bean - public Tracer tracer(BrokerClient brokerClient, BrokerClientProperties properties) { + public Tracer tracer(BrokerService brokerClient, BrokerClientProperties properties) { return new BrokerTracerSupplier(brokerClient, Optional.of(properties.getTracing().getGroup())) .get(); } @@ -295,14 +267,15 @@ public Tracer tracer(BrokerClient brokerClient, BrokerClientProperties propertie @Configuration @ConditionalOnNotWebApplication - @ConditionalOnMissingBean(BrokerClient.class) + @ConditionalOnMissingBean({BrokerService.class}) public static class NonWebBrokerClientConfiguration { @Bean - public BrokerClient brokerClient( - List configurers, + public RoutingBrokerService routingBrokerService( + MutableRouter router, + List configurers, ConfigurableApplicationContext context) { - BrokerClient brokerClient = configureBrokerClient(configurers); + RoutingBrokerService brokerClient = configureBrokerClient(router, configurers); startDaemonAwaitThread(brokerClient); @@ -318,9 +291,9 @@ public BrokerClient brokerClient( return brokerClient; } - private void startDaemonAwaitThread(BrokerClient brokerClient) { + private void startDaemonAwaitThread(BrokerService brokerClient) { Thread awaitThread = - new Thread("broker-client-thread") { + new Thread("broker-service-thread") { @Override public void run() { @@ -331,16 +304,27 @@ public void run() { awaitThread.setDaemon(false); awaitThread.start(); } + + @Bean + public BrokerClient routingBrokerService(RoutingBrokerService routingBrokerService) { + return BrokerClient.from(routingBrokerService); + } } @Configuration @ConditionalOnWebApplication - @ConditionalOnMissingBean(BrokerClient.class) + @ConditionalOnMissingBean({BrokerService.class}) public static class WebBrokerClientConfiguration { @Bean - public BrokerClient brokerClient(List configurers) { - return configureBrokerClient(configurers); + public RoutingBrokerService routingBrokerService( + MutableRouter router, List configurers) { + return configureBrokerClient(router, configurers); + } + + @Bean + public BrokerClient routingBrokerService(RoutingBrokerService routingBrokerService) { + return BrokerClient.from(routingBrokerService); } } } diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingAutoConfiguration.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingAutoConfiguration.java index 51412f11..c7c0988e 100644 --- a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingAutoConfiguration.java +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingAutoConfiguration.java @@ -16,34 +16,34 @@ package com.netifi.spring.boot; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; import com.netifi.spring.messaging.BrokerClientRequesterMethodArgumentResolver; import com.netifi.spring.messaging.MessagingRSocketRequesterClientFactory; +import com.netifi.spring.messaging.MessagingRouter; import io.micrometer.core.instrument.MeterRegistry; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; import io.opentracing.Tracer; import io.rsocket.AbstractRSocket; -import io.rsocket.ConnectionSetupPayload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; -import io.rsocket.frame.SetupFrameFlyweight; +import io.rsocket.ipc.MutableRouter; import io.rsocket.transport.netty.server.TcpServerTransport; -import java.time.Duration; import java.util.Optional; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.rsocket.RSocketProperties; import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.rsocket.context.RSocketServerBootstrap; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.rsocket.MessageHandlerAcceptor; import org.springframework.messaging.rsocket.RSocketRequester; -import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver; import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; +import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver; +import org.springframework.util.MimeTypeUtils; /** * {@link EnableAutoConfiguration Auto-configuration} for Spring RSocket support in Spring @@ -61,62 +61,60 @@ public class BrokerClientMessagingAutoConfiguration { private static final RSocket STUB_RSOCKET = new AbstractRSocket() {}; @Bean - @ConditionalOnMissingBean - public MessageHandlerAcceptor messageHandlerAcceptor( + public MutableRouter messagingCustomizer( + RSocketProperties rSocketProperties, BrokerClientProperties brokerClientProperties, BrokerClientMessagingProperties properties, DefaultListableBeanFactory factory, RSocketStrategies rSocketStrategies, - BrokerClient brokerClient) { - BrokerClientProperties.KeepAliveProperties keepalive = brokerClientProperties.getKeepalive(); - Duration tickPeriod = Duration.ofSeconds(keepalive.getTickPeriodSeconds()); - Duration ackTimeout = Duration.ofSeconds(keepalive.getAckTimeoutSeconds()); - int missedAcks = keepalive.getMissedAcks(); + RSocketMessageHandler handler) { + return new MessagingRouter( + MimeTypeUtils.ALL, + MimeTypeUtils.ALL, + rSocketStrategies.metadataExtractor(), + handler, + rSocketStrategies.routeMatcher(), + rSocketStrategies); + } - ConnectionSetupPayload connectionSetupPayload = - // FIXME: hardcoded mime for responder - ConnectionSetupPayload.create( - SetupFrameFlyweight.encode( - ByteBufAllocator.DEFAULT, - false, - (int) tickPeriod.toMillis(), - (int) (ackTimeout.toMillis() + tickPeriod.toMillis() * missedAcks), - Unpooled.EMPTY_BUFFER, - "text/plain", - "text/plain", - Unpooled.EMPTY_BUFFER, - Unpooled.EMPTY_BUFFER)); - MessageHandlerAcceptor acceptor = new MessageHandlerAcceptor(); - acceptor.setRSocketStrategies(rSocketStrategies); - acceptor + @Bean + @ConditionalOnMissingBean + public RSocketServerBootstrap messageHandlerAcceptor( + BrokerClientMessagingProperties properties, + DefaultListableBeanFactory factory, + RSocketStrategies rSocketStrategies, + RSocketMessageHandler handler, + BrokerService brokerClient, + Optional registry, + Optional tracer) { + RSocketServerBootstrap bootstrap = new NetifiBootstrap(brokerClient); + + handler .getArgumentResolverConfigurer() .getCustomResolvers() .removeIf(r -> r instanceof RSocketRequesterMethodArgumentResolver); - acceptor + handler .getArgumentResolverConfigurer() .addCustomResolver( new BrokerClientRequesterMethodArgumentResolver( - properties.getName(), brokerClient, factory, rSocketStrategies)); - - brokerClient.addNamedRSocket( - properties.getName(), acceptor.apply(connectionSetupPayload, STUB_RSOCKET)); + brokerClient, + factory, + rSocketStrategies, + registry.orElse(null), + tracer.orElse(null))); - return acceptor; + return bootstrap; } @Bean public MessagingRSocketRequesterClientFactory messagingRSocketRequesterClientFactory( BrokerClientMessagingProperties properties, - BrokerClient brokerClient, + BrokerService brokerClient, RSocketStrategies rSocketStrategies, Optional registry, Optional tracer) { return new MessagingRSocketRequesterClientFactory( - properties.getName(), - brokerClient, - registry.orElse(null), - tracer.orElse(null), - rSocketStrategies); + brokerClient, registry.orElse(null), tracer.orElse(null), rSocketStrategies); } } diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingProperties.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingProperties.java index 2b01234f..97df5b74 100644 --- a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingProperties.java +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/BrokerClientMessagingProperties.java @@ -15,8 +15,6 @@ */ package com.netifi.spring.boot; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; @@ -26,15 +24,4 @@ */ @ConfigurationProperties("netifi.client.messaging") @Validated -public class BrokerClientMessagingProperties { - - @NotEmpty @NotNull private String name = "spring-messaging"; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } -} +public class BrokerClientMessagingProperties {} diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/NetifiBootstrap.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/NetifiBootstrap.java new file mode 100644 index 00000000..68529430 --- /dev/null +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/NetifiBootstrap.java @@ -0,0 +1,70 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netifi.spring.boot; + +import com.netifi.broker.BrokerService; +import java.net.InetSocketAddress; +import org.springframework.boot.rsocket.context.RSocketServerBootstrap; +import org.springframework.boot.rsocket.context.RSocketServerInitializedEvent; +import org.springframework.boot.rsocket.server.RSocketServer; +import org.springframework.boot.rsocket.server.RSocketServerException; +import org.springframework.context.ApplicationEventPublisher; + +public class NetifiBootstrap extends RSocketServerBootstrap { + + private final BrokerService brokerClient; + private ApplicationEventPublisher eventPublisher; + + public NetifiBootstrap(BrokerService client) { + super((__) -> null, (setup, sendingSocket) -> null); + brokerClient = client; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.eventPublisher = applicationEventPublisher; + } + + @Override + public void start() { + this.eventPublisher.publishEvent( + new RSocketServerInitializedEvent( + new RSocketServer() { + @Override + public void start() throws RSocketServerException {} + + @Override + public void stop() throws RSocketServerException { + brokerClient.dispose(); + } + + @Override + public InetSocketAddress address() { + return InetSocketAddress.createUnresolved("localhost", 0); + } + })); + } + + @Override + public void stop() { + this.brokerClient.dispose(); + } + + @Override + public boolean isRunning() { + return !brokerClient.isDisposed(); + } +} diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerClientConfigurer.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerClientConfigurer.java index 5bc067d6..1fb7c039 100644 --- a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerClientConfigurer.java +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerClientConfigurer.java @@ -16,9 +16,15 @@ package com.netifi.spring.boot.support; import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerFactory; @FunctionalInterface -public interface BrokerClientConfigurer { +@Deprecated +public interface BrokerClientConfigurer extends BrokerServiceConfigurer { BrokerClient.CustomizableBuilder configure(BrokerClient.CustomizableBuilder builder); + + default void configure(BrokerFactory.ClientBuilder builder) { + // FIXME + } } diff --git a/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerServiceConfigurer.java b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerServiceConfigurer.java new file mode 100644 index 00000000..5b065ed4 --- /dev/null +++ b/netifi-spring-boot-autoconfigure/src/main/java/com/netifi/spring/boot/support/BrokerServiceConfigurer.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netifi.spring.boot.support; + +import com.netifi.broker.BrokerFactory; + +@FunctionalInterface +public interface BrokerServiceConfigurer { + + void configure(BrokerFactory.ClientBuilder builder); +} diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/BrokerClientSpringIntegrationTest.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/BrokerClientSpringIntegrationTest.java index 71993379..5e580981 100644 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/BrokerClientSpringIntegrationTest.java +++ b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/BrokerClientSpringIntegrationTest.java @@ -18,7 +18,6 @@ import com.netifi.broker.info.BrokerInfoService; import com.netifi.broker.info.BrokerInfoServiceClient; import com.netifi.spring.DefaultExternalIdlClient; -import com.netifi.spring.boot.BrokerClientAutoConfiguration; import com.netifi.spring.core.BroadcastAwareClientFactory; import com.netifi.spring.core.DestinationAwareClientFactory; import com.netifi.spring.core.GroupAwareClientFactory; @@ -26,7 +25,6 @@ import com.netifi.spring.core.annotation.BrokerClient; import com.netifi.spring.core.annotation.Destination; import com.netifi.spring.core.annotation.Group; -import com.netifi.spring.core.config.BrokerClientConfiguration; import io.rsocket.rpc.metrics.om.MetricsSnapshotHandler; import io.rsocket.rpc.metrics.om.MetricsSnapshotHandlerClient; import org.junit.jupiter.api.Assertions; @@ -35,13 +33,11 @@ import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Primary; -import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringExtension; @ExtendWith(SpringExtension.class) @@ -87,6 +83,8 @@ public class BrokerClientSpringIntegrationTest { @Autowired com.netifi.broker.BrokerClient brokerClient; + @Autowired com.netifi.broker.BrokerService brokerService; + @Autowired ConfigurableApplicationContext context; @Test @@ -114,5 +112,11 @@ static class TestConfiguration { public com.netifi.broker.BrokerClient mockedBrokerClient() { return Mockito.mock(com.netifi.broker.BrokerClient.class); } + + @Bean + @Primary + public com.netifi.broker.RoutingBrokerService mockedRoutingBrokerService() { + return Mockito.mock(com.netifi.broker.RoutingBrokerService.class); + } } } diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/ClientConfigurationIntegrationTest.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/ClientConfigurationIntegrationTest.java index ad4daa00..a140bc4e 100644 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/ClientConfigurationIntegrationTest.java +++ b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/ClientConfigurationIntegrationTest.java @@ -18,9 +18,7 @@ import static org.mockito.ArgumentMatchers.any; import com.netifi.broker.BrokerClient; -import com.netifi.spring.boot.BrokerClientAutoConfiguration; import com.netifi.spring.boot.support.BrokerClientConfigurer; -import com.netifi.spring.core.config.BrokerClientConfiguration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,19 +28,21 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ComponentScan; import org.springframework.test.context.junit.jupiter.SpringExtension; @ExtendWith(SpringExtension.class) -@SpringBootTest +@SpringBootTest(classes = { + com.netifi.spring.boot.BrokerClientAutoConfiguration.class, + com.netifi.spring.boot.BrokerClientMessagingAutoConfiguration.class, + com.netifi.spring.core.config.BrokerClientConfiguration.class +}) public class ClientConfigurationIntegrationTest { @Autowired @Qualifier("mock2") BrokerClientConfigurer configurer; - @Autowired - BrokerClient brokerClient; + @Autowired BrokerClient brokerClient; @Test public void testThatConfigurerWorks() { @@ -56,7 +56,7 @@ public void testThatConfigurerWorks() { } @org.springframework.boot.test.context.TestConfiguration -// @ComponentScan + // @ComponentScan static class TestConfiguration { @Bean diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandler.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandlerTest.java similarity index 93% rename from netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandler.java rename to netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandlerTest.java index 022146b3..1a9ecb58 100644 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandler.java +++ b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/MessageHandlerTest.java @@ -22,10 +22,10 @@ @Controller @MessageMapping("test") -public class MessageHandler { +public class MessageHandlerTest { @MessageMapping("process") public Mono process(@Payload Mono data) { - return Mono.just("Echo: " + data); + return data.map(m -> "Echo: " + m); } } diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/SpringMessagingIntegrationTest.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/SpringMessagingIntegrationTest.java index 55111375..ce4d4871 100644 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/SpringMessagingIntegrationTest.java +++ b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/SpringMessagingIntegrationTest.java @@ -29,7 +29,7 @@ public class SpringMessagingIntegrationTest { public static GenericContainer redis = - new GenericContainer("netifi/broker:1.6.4") + new GenericContainer("netifi/broker:1.6.10") .withExposedPorts(8001, 7001, 6001, 8101) .withEnv( "BROKER_SERVER_OPTS", @@ -50,7 +50,8 @@ public class SpringMessagingIntegrationTest { @Test public void tests() { Assert.assertNotNull(requester.rsocket()); - - requester.route("test.process").data("test").retrieveMono(String.class).log().block(); + Assert.assertEquals( + "Echo: test", + requester.route("test.process").data("test").retrieveMono(String.class).log().block()); } } diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestApplication.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestApplication.java deleted file mode 100644 index 2f32858b..00000000 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestApplication.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2019 The Netifi Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.netifi.spring.boot.test; - -import com.netifi.spring.boot.BrokerClientAutoConfiguration; -import com.netifi.spring.boot.BrokerClientMessagingAutoConfiguration; -import com.netifi.spring.core.config.BrokerClientConfiguration; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.ImportAutoConfiguration; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -@ImportAutoConfiguration({ - BrokerClientMessagingAutoConfiguration.class, - BrokerClientAutoConfiguration.class, - BrokerClientConfiguration.class, -}) -public class TestApplication { - public static void main(String[] args) { - SpringApplication.run(TestApplication.class, args); - } -} diff --git a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestIdlServiceServer.java b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestIdlServiceServer.java index 327144f8..f52ef3bc 100644 --- a/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestIdlServiceServer.java +++ b/netifi-spring-boot-autoconfigure/src/test/java/com/netifi/spring/boot/test/TestIdlServiceServer.java @@ -15,14 +15,8 @@ */ package com.netifi.spring.boot.test; -import java.util.Map; - -import io.rsocket.Payload; -import io.rsocket.ipc.util.IPCChannelFunction; -import io.rsocket.ipc.util.IPCFunction; +import io.rsocket.ipc.MutableRouter; import io.rsocket.rpc.AbstractRSocketService; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; @javax.annotation.Generated( value = "by RSocket RPC proto compiler (version 0.2.2)", @@ -39,10 +33,5 @@ public Class getServiceClass() { } @Override - public void selfRegister(Map>> fireAndForgetRegistry, - Map>> requestResponseRegistry, - Map>> requestStreamRegistry, - Map requestChannelRegistry) { - - } + public void selfRegister(MutableRouter router) {} } diff --git a/netifi-spring-boot-autoconfigure/src/test/resources/META-INF/spring.factories b/netifi-spring-boot-autoconfigure/src/test/resources/META-INF/spring.factories new file mode 100644 index 00000000..181bad26 --- /dev/null +++ b/netifi-spring-boot-autoconfigure/src/test/resources/META-INF/spring.factories @@ -0,0 +1,5 @@ +# Auto Configure +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.netifi.spring.boot.BrokerClientAutoConfiguration,\ + com.netifi.spring.boot.BrokerClientMessagingAutoConfiguration,\ + com.netifi.spring.core.config.BrokerClientConfiguration \ No newline at end of file diff --git a/netifi-spring-boot-starter/build.gradle b/netifi-spring-boot-starter/build.gradle index d34fe877..f3175f5a 100644 --- a/netifi-spring-boot-starter/build.gradle +++ b/netifi-spring-boot-starter/build.gradle @@ -7,6 +7,7 @@ description = 'Netifi Spring Boot Starter' dependencies { compile project(':netifi-spring-boot-autoconfigure') compile project(':netifi-spring-core') + compile project(':netifi-spring-messaging') compile 'org.springframework.boot:spring-boot-starter' } diff --git a/netifi-spring-core/build.gradle b/netifi-spring-core/build.gradle index 4563680e..28c1626d 100644 --- a/netifi-spring-core/build.gradle +++ b/netifi-spring-core/build.gradle @@ -4,14 +4,6 @@ plugins { description = 'Netifi Spring Core' -dependencyManagement { - imports { - mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootDependenciesVersion}" - mavenBom "io.netty:netty-bom:${nettyVersion}" - mavenBom "io.projectreactor:reactor-bom:${reactorBomVersion}" - } -} - dependencies { compile project(':netifi-common') compile project(':netifi-broker-client') diff --git a/netifi-spring-core/src/main/java/com/netifi/spring/core/BrokerClientApplicationEventListener.java b/netifi-spring-core/src/main/java/com/netifi/spring/core/BrokerClientApplicationEventListener.java index 72f9e3fc..52413160 100644 --- a/netifi-spring-core/src/main/java/com/netifi/spring/core/BrokerClientApplicationEventListener.java +++ b/netifi-spring-core/src/main/java/com/netifi/spring/core/BrokerClientApplicationEventListener.java @@ -15,7 +15,7 @@ */ package com.netifi.spring.core; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.RoutingBrokerService; import io.rsocket.rpc.RSocketRpcService; import java.util.Map; import org.springframework.context.ApplicationContext; @@ -23,9 +23,9 @@ import org.springframework.context.event.EventListener; public class BrokerClientApplicationEventListener { - private final BrokerClient brokerClient; + private final RoutingBrokerService brokerClient; - public BrokerClientApplicationEventListener(BrokerClient brokerClient) { + public BrokerClientApplicationEventListener(RoutingBrokerService brokerClient) { this.brokerClient = brokerClient; } @@ -35,6 +35,6 @@ public void onApplicationEvent(ContextRefreshedEvent event) { Map rSocketServiceMap = context.getBeansOfType(RSocketRpcService.class); - rSocketServiceMap.values().forEach(brokerClient::addService); + rSocketServiceMap.values().forEach(s -> s.selfRegister(brokerClient.router())); } } diff --git a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientBeanDefinitionRegistryPostProcessor.java b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientBeanDefinitionRegistryPostProcessor.java index 24f55806..04d93bb7 100644 --- a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientBeanDefinitionRegistryPostProcessor.java +++ b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientBeanDefinitionRegistryPostProcessor.java @@ -204,12 +204,12 @@ private static Class resolveClass(BeanDefinition beanDefinition) private static ResolvableType resolveResolvableType(BeanDefinition beanDefinition) { if (beanDefinition instanceof RootBeanDefinition) { - return ((RootBeanDefinition) beanDefinition).getResolvableType(); + return ResolvableType.forClass(((RootBeanDefinition) beanDefinition).getTargetType()); } else if (beanDefinition instanceof GenericBeanDefinition) { return ResolvableType.forClass(((GenericBeanDefinition) beanDefinition).getBeanClass()); + } else { + return beanDefinition.getResolvableType(); } - - throw new IllegalArgumentException("Impossible to resolve bean type"); } private static boolean findRealImplementationAndMarkAsPrimary( diff --git a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientStaticFactory.java b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientStaticFactory.java index dac66236..636a683f 100644 --- a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientStaticFactory.java +++ b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/BrokerClientStaticFactory.java @@ -49,7 +49,7 @@ public class BrokerClientStaticFactory { /** * Creates an instance of the correct Netifi Broker client for injection into a annotated field. * - * @return an instance of a {@link com.netifi.broker.BrokerClient} client + * @return an instance of a {@link com.netifi.broker.BrokerService} client */ public static Object getBeanInstance( final DefaultListableBeanFactory beanFactory, @@ -234,7 +234,7 @@ private static String getBeanName(BrokerClient brokerClientAnnotation, Class } static T createBrokerClient( - com.netifi.broker.BrokerClient brokerClient, + com.netifi.broker.BrokerService brokerClient, BrokerClient.Type routeType, String group, String destination, @@ -272,7 +272,7 @@ static T createBrokerClient( } public static BrokerSocket createBrokerRSocket( - com.netifi.broker.BrokerClient brokerClient, + com.netifi.broker.BrokerService brokerClient, BrokerClient.Type routeType, String group, String destination, @@ -281,14 +281,14 @@ public static BrokerSocket createBrokerRSocket( switch (routeType) { case BROADCAST: - brokerSocket = brokerClient.broadcastServiceSocket(group, tags); + brokerSocket = brokerClient.broadcast(group, tags); break; case GROUP: - brokerSocket = brokerClient.groupServiceSocket(group, tags); + brokerSocket = brokerClient.group(group, tags); break; case DESTINATION: brokerSocket = - brokerClient.groupServiceSocket( + brokerClient.group( group, StringUtils.isEmpty(destination) ? tags diff --git a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/RpcBrokerClientFactorySupport.java b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/RpcBrokerClientFactorySupport.java index 292fc87f..8dee60cd 100644 --- a/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/RpcBrokerClientFactorySupport.java +++ b/netifi-spring-core/src/main/java/com/netifi/spring/core/annotation/RpcBrokerClientFactorySupport.java @@ -8,12 +8,12 @@ public class RpcBrokerClientFactorySupport implements BrokerClientFactorySupport { - private final com.netifi.broker.BrokerClient brokerClient; + private final com.netifi.broker.BrokerService brokerClient; private final Tracer tracer; private final MeterRegistry meterRegistry; public RpcBrokerClientFactorySupport( - com.netifi.broker.BrokerClient client, MeterRegistry registry, Tracer tracer) { + com.netifi.broker.BrokerService client, MeterRegistry registry, Tracer tracer) { brokerClient = client; this.tracer = tracer; meterRegistry = registry; diff --git a/netifi-spring-core/src/main/java/com/netifi/spring/core/config/BrokerClientConfiguration.java b/netifi-spring-core/src/main/java/com/netifi/spring/core/config/BrokerClientConfiguration.java index 1d45417b..58f74ce9 100644 --- a/netifi-spring-core/src/main/java/com/netifi/spring/core/config/BrokerClientConfiguration.java +++ b/netifi-spring-core/src/main/java/com/netifi/spring/core/config/BrokerClientConfiguration.java @@ -15,7 +15,8 @@ */ package com.netifi.spring.core.config; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; +import com.netifi.broker.RoutingBrokerService; import com.netifi.broker.info.BrokerInfoService; import com.netifi.broker.info.BrokerInfoServiceClient; import com.netifi.broker.info.BrokerInfoServiceServer; @@ -36,16 +37,15 @@ import org.springframework.context.annotation.AnnotatedBeanDefinitionReader; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.annotation.Order; @Configuration public class BrokerClientConfiguration implements ApplicationContextAware { @Bean public RpcBrokerClientFactorySupport rpcBrokerClientFactorySupport( - BrokerClient brokerClient, Optional registry, Optional tracer) { + BrokerService brokerService, Optional registry, Optional tracer) { return new RpcBrokerClientFactorySupport( - brokerClient, registry.orElse(null), tracer.orElse(null)); + brokerService, registry.orElse(null), tracer.orElse(null)); } @Bean(name = "internalBrokerClientBeanDefinitionRegistryPostProcessor") @@ -56,7 +56,7 @@ public RpcBrokerClientFactorySupport rpcBrokerClientFactorySupport( @Bean public BrokerClientApplicationEventListener brokerClientApplicationEventListener( - BrokerClient brokerClient) { + RoutingBrokerService brokerClient) { return new BrokerClientApplicationEventListener(brokerClient); } diff --git a/netifi-spring-core/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java b/netifi-spring-core/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java index b20f6344..69b5957b 100644 --- a/netifi-spring-core/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java +++ b/netifi-spring-core/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java @@ -15,14 +15,8 @@ */ package com.netifi.spring.core; -import java.util.Map; - -import io.rsocket.Payload; -import io.rsocket.ipc.util.IPCChannelFunction; -import io.rsocket.ipc.util.IPCFunction; +import io.rsocket.ipc.MutableRouter; import io.rsocket.rpc.AbstractRSocketService; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; @javax.annotation.Generated( value = "by RSocket RPC proto compiler (version 0.2.10)", @@ -39,10 +33,5 @@ public Class getServiceClass() { } @Override - public void selfRegister(Map>> fireAndForgetRegistry, - Map>> requestResponseRegistry, - Map>> requestStreamRegistry, - Map requestChannelRegistry) { - - } + public void selfRegister(MutableRouter router) {} } diff --git a/netifi-spring-core/src/test/java/com/netifi/spring/core/TestableConfiguration.java b/netifi-spring-core/src/test/java/com/netifi/spring/core/TestableConfiguration.java index 385f0378..9b9ffe2d 100644 --- a/netifi-spring-core/src/test/java/com/netifi/spring/core/TestableConfiguration.java +++ b/netifi-spring-core/src/test/java/com/netifi/spring/core/TestableConfiguration.java @@ -16,6 +16,7 @@ package com.netifi.spring.core; import com.netifi.broker.BrokerClient; +import com.netifi.broker.RoutingBrokerService; import com.netifi.spring.core.config.EnableBrokerClient; import org.mockito.Mockito; import org.springframework.context.annotation.Bean; @@ -30,6 +31,12 @@ public BrokerClient brokerClient() { return Mockito.mock(BrokerClient.class); } + @Bean + public RoutingBrokerService routingBrokerService() { + return Mockito.mock(RoutingBrokerService.class); + } + + @Bean public TestIdlImpl testIdlImpl() { return new TestIdlImpl(); diff --git a/netifi-spring-messaging/build.gradle b/netifi-spring-messaging/build.gradle index 06f75503..0a2eb58e 100644 --- a/netifi-spring-messaging/build.gradle +++ b/netifi-spring-messaging/build.gradle @@ -4,12 +4,6 @@ plugins { description = 'Netifi Spring Core' -dependencyManagement { - imports { - mavenBom "org.springframework.boot:spring-boot-dependencies:${springBootDependenciesVersion}" - } -} - dependencies { compile project(":netifi-spring-core") diff --git a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/BrokerClientRequesterMethodArgumentResolver.java b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/BrokerClientRequesterMethodArgumentResolver.java index db7d9830..689ec02e 100644 --- a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/BrokerClientRequesterMethodArgumentResolver.java +++ b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/BrokerClientRequesterMethodArgumentResolver.java @@ -1,17 +1,17 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2019 The Netifi Authors * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.netifi.spring.messaging; @@ -21,7 +21,9 @@ import static com.netifi.spring.messaging.RSocketRequesterStaticFactory.resolveBrokerClientRSocket; import static org.springframework.core.annotation.AnnotatedElementUtils.getMergedAnnotation; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; +import io.micrometer.core.instrument.MeterRegistry; +import io.opentracing.Tracer; import io.rsocket.RSocket; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.core.MethodParameter; @@ -32,29 +34,25 @@ import org.springframework.util.Assert; import reactor.core.publisher.Mono; -/** - * Resolves arguments of type {@link RSocket} that can be used for making requests to the remote - * peer. - * - * @author Rossen Stoyanchev - * @since 5.2 - */ public class BrokerClientRequesterMethodArgumentResolver implements HandlerMethodArgumentResolver { - private final String rSocketName; - private final BrokerClient brokerClient; + private final BrokerService brokerClient; private final DefaultListableBeanFactory listableBeanFactory; private final RSocketStrategies rSocketStrategies; + private final MeterRegistry registry; + private final Tracer tracer; public BrokerClientRequesterMethodArgumentResolver( - String rSocketName, - BrokerClient client, + BrokerService client, DefaultListableBeanFactory factory, - RSocketStrategies strategies) { - this.rSocketName = rSocketName; - brokerClient = client; - listableBeanFactory = factory; - rSocketStrategies = strategies; + RSocketStrategies strategies, + MeterRegistry registry, + Tracer tracer) { + this.brokerClient = client; + this.listableBeanFactory = factory; + this.rSocketStrategies = strategies; + this.registry = registry; + this.tracer = tracer; } @Override @@ -77,15 +75,15 @@ public Mono resolveArgument(MethodParameter parameter, Message messag if (RSocketRequester.class.equals(type)) { return Mono.just( createRSocketRequester( - rSocketName, brokerClient, brokerClientAnnotation, resolveTags(listableBeanFactory, brokerClientAnnotation), - rSocketStrategies)); + rSocketStrategies, + registry, + tracer)); } else if (RSocket.class.isAssignableFrom(type)) { return Mono.just( resolveBrokerClientRSocket( - rSocketName, brokerClient, brokerClientAnnotation.type(), brokerClientAnnotation.group(), diff --git a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRSocketRequesterClientFactory.java b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRSocketRequesterClientFactory.java index e4c5b457..85efb460 100644 --- a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRSocketRequesterClientFactory.java +++ b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRSocketRequesterClientFactory.java @@ -1,3 +1,18 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netifi.spring.messaging; import static com.netifi.spring.messaging.RSocketRequesterStaticFactory.createRSocketRequester; @@ -12,19 +27,16 @@ public class MessagingRSocketRequesterClientFactory implements BrokerClientFactorySupport { - private final String rSocketName; - private final com.netifi.broker.BrokerClient brokerClient; + private final com.netifi.broker.BrokerService brokerClient; private final Tracer tracer; private final MeterRegistry meterRegistry; private final RSocketStrategies rSocketStrategies; public MessagingRSocketRequesterClientFactory( - String rSocketName, - com.netifi.broker.BrokerClient brokerClient, + com.netifi.broker.BrokerService brokerClient, MeterRegistry meterRegistry, Tracer tracer, RSocketStrategies strategies) { - this.rSocketName = rSocketName; this.brokerClient = brokerClient; this.tracer = tracer; this.meterRegistry = meterRegistry; @@ -42,7 +54,6 @@ public T lookup( Class tClass, BrokerClient.Type type, String methodGroup, Tags methodTags) { return (T) createRSocketRequester( - rSocketName, brokerClient, type, methodGroup, diff --git a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRouter.java b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRouter.java new file mode 100644 index 00000000..b6e280ef --- /dev/null +++ b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MessagingRouter.java @@ -0,0 +1,335 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netifi.spring.messaging; + +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.frame.FrameType; +import io.rsocket.ipc.MutableRouter; +import io.rsocket.ipc.util.IPCChannelFunction; +import io.rsocket.ipc.util.IPCFunction; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.ReactiveMessageHandler; +import org.springframework.messaging.handler.DestinationPatternsMessageCondition; +import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; +import org.springframework.messaging.rsocket.MetadataExtractor; +import org.springframework.messaging.rsocket.PayloadUtils; +import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition; +import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.RouteMatcher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +public class MessagingRouter implements MutableRouter { + + private final MimeType dataMimeType; + + private final MimeType metadataMimeType; + + private final MetadataExtractor metadataExtractor; + + private final ReactiveMessageHandler messageHandler; + + private final RouteMatcher routeMatcher; + + private final RSocketStrategies strategies; + + public MessagingRouter( + MimeType dataMimeType, + MimeType metadataMimeType, + MetadataExtractor metadataExtractor, + ReactiveMessageHandler messageHandler, + RouteMatcher routeMatcher, + RSocketStrategies strategies) { + + Assert.notNull(dataMimeType, "'dataMimeType' is required"); + Assert.notNull(metadataMimeType, "'metadataMimeType' is required"); + Assert.notNull(metadataExtractor, "MetadataExtractor is required"); + Assert.notNull(messageHandler, "ReactiveMessageHandler is required"); + Assert.notNull(routeMatcher, "RouteMatcher is required"); + Assert.notNull(strategies, "RSocketStrategies is required"); + + this.dataMimeType = dataMimeType; + this.metadataMimeType = metadataMimeType; + this.metadataExtractor = metadataExtractor; + this.messageHandler = messageHandler; + this.routeMatcher = routeMatcher; + this.strategies = strategies; + } + + /** + * Wrap the {@link ConnectionSetupPayload} with a {@link Message} and delegate to {@link + * #handle(Payload, FrameType)} for handling. + * + * @param payload the connection payload + * @return completion handle for success or error + */ + public Mono handleConnectionSetupPayload(ConnectionSetupPayload payload) { + // frameDecoder does not apply to connectionSetupPayload + // so retain here since handle expects it.. + payload.retain(); + return handle(payload, FrameType.SETUP); + } + + @Override + public IPCFunction> routeFireAndForget(String route) { + return (payload, decoded) -> + decoded.isComposite() + ? this.handle(payload.retain(), FrameType.REQUEST_FNF) + : this.handle(payload.retain(), route, FrameType.REQUEST_FNF); + } + + @Override + public IPCFunction> routeRequestResponse(String route) { + return (payload, decoded) -> + decoded.isComposite() + ? handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload.retain())) + .next() + : handleAndReply( + payload, route, FrameType.REQUEST_RESPONSE, Flux.just(payload.retain())) + .next(); + } + + @Override + public IPCFunction> routeRequestStream(String route) { + return (payload, decoded) -> + decoded.isComposite() + ? handleAndReply(payload, FrameType.REQUEST_STREAM, Flux.just(payload.retain())) + : handleAndReply(payload, route, FrameType.REQUEST_STREAM, Flux.just(payload.retain())); + } + + @Override + public IPCChannelFunction routeRequestChannel(String route) { + return (source, payload, decoded) -> + decoded.isComposite() + ? handleAndReply(payload, FrameType.REQUEST_CHANNEL, source) + : handleAndReply(payload, route, FrameType.REQUEST_CHANNEL, source); + } + + @Override + public MessagingRouter withFireAndForgetRoute(String route, IPCFunction> function) { + return null; + } + + @Override + public MessagingRouter withRequestResponseRoute( + String route, IPCFunction> function) { + return null; + } + + @Override + public MessagingRouter withRequestStreamRoute(String route, IPCFunction> function) { + return null; + } + + @Override + public MessagingRouter withRequestChannelRoute(String route, IPCChannelFunction function) { + return null; + } + + // @Override + // public Mono fireAndForget(Payload payload) { + // return handle(payload, FrameType.REQUEST_FNF); + // } + // + // @Override + // public Mono requestResponse(Payload payload) { + // return handleAndReply(payload, FrameType.REQUEST_RESPONSE, Flux.just(payload)).next(); + // } + // + // @Override + // public Flux requestStream(Payload payload) { + // return handleAndReply(payload, FrameType.REQUEST_STREAM, Flux.just(payload)); + // } + // + // @Override + // public Flux requestChannel(Publisher payloads) { + // return Flux.from(payloads) + // .switchOnFirst((signal, innerFlux) -> { + // Payload firstPayload = signal.get(); + // return firstPayload == null ? innerFlux : + // handleAndReply(firstPayload, FrameType.REQUEST_CHANNEL, innerFlux); + // }); + // } + // + // @Override + // public Mono metadataPush(Payload payload) { + // // Not very useful until createHeaders does more with metadata + // return handle(payload, FrameType.METADATA_PUSH); + // } + + private Mono handle(Payload payload, FrameType frameType) { + MessageHeaders headers = createHeaders(payload, frameType, null); + DataBuffer dataBuffer = retainDataAndReleasePayload(payload); + int refCount = refCount(dataBuffer); + Message message = MessageBuilder.createMessage(dataBuffer, headers); + return Mono.defer(() -> this.messageHandler.handleMessage(message)) + .doFinally( + s -> { + if (refCount(dataBuffer) == refCount) { + DataBufferUtils.release(dataBuffer); + } + }); + } + + private Mono handle(Payload payload, String route, FrameType frameType) { + MessageHeaders headers = createHeaders(route, frameType, null); + DataBuffer dataBuffer = retainDataAndReleasePayload(payload); + int refCount = refCount(dataBuffer); + Message message = MessageBuilder.createMessage(dataBuffer, headers); + return Mono.defer(() -> this.messageHandler.handleMessage(message)) + .doFinally( + s -> { + if (refCount(dataBuffer) == refCount) { + DataBufferUtils.release(dataBuffer); + } + }); + } + + private int refCount(DataBuffer dataBuffer) { + return dataBuffer instanceof NettyDataBuffer + ? ((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() + : 1; + } + + private Flux handleAndReply( + Payload firstPayload, FrameType frameType, Flux payloads) { + MonoProcessor> replyMono = MonoProcessor.create(); + MessageHeaders headers = createHeaders(firstPayload, frameType, replyMono); + + AtomicBoolean read = new AtomicBoolean(); + Flux buffers = + payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true)); + Message> message = MessageBuilder.createMessage(buffers, headers); + + return Mono.defer(() -> this.messageHandler.handleMessage(message)) + .doFinally( + s -> { + // Subscription should have happened by now due to ChannelSendOperator + if (!read.get()) { + buffers.subscribe(DataBufferUtils::release); + } + }) + .thenMany( + Flux.defer( + () -> + replyMono.isTerminated() + ? replyMono.flatMapMany(Function.identity()) + : Mono.error( + new IllegalStateException( + "Something went wrong: reply Mono not set")))); + } + + private Flux handleAndReply( + Payload firstPayload, String route, FrameType frameType, Flux payloads) { + MonoProcessor> replyMono = MonoProcessor.create(); + MessageHeaders headers = createHeaders(route, frameType, replyMono); + + AtomicBoolean read = new AtomicBoolean(); + Flux buffers = + payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true)); + Message> message = MessageBuilder.createMessage(buffers, headers); + + return Mono.defer(() -> this.messageHandler.handleMessage(message)) + .doFinally( + s -> { + // Subscription should have happened by now due to ChannelSendOperator + if (!read.get()) { + buffers.subscribe(DataBufferUtils::release); + } + }) + .thenMany( + Flux.defer( + () -> + replyMono.isTerminated() + ? replyMono.flatMapMany(Function.identity()) + : Mono.error( + new IllegalStateException( + "Something went wrong: reply Mono not set")))); + } + + private DataBuffer retainDataAndReleasePayload(Payload payload) { + return PayloadUtils.retainDataAndReleasePayload(payload, this.strategies.dataBufferFactory()); + } + + private MessageHeaders createHeaders( + Payload payload, FrameType frameType, @Nullable MonoProcessor replyMono) { + + MessageHeaderAccessor headers = new MessageHeaderAccessor(); + headers.setLeaveMutable(true); + + Map metadataValues = + this.metadataExtractor.extract(payload, this.metadataMimeType); + + metadataValues.putIfAbsent(MetadataExtractor.ROUTE_KEY, ""); + for (Map.Entry entry : metadataValues.entrySet()) { + if (entry.getKey().equals(MetadataExtractor.ROUTE_KEY)) { + RouteMatcher.Route route = this.routeMatcher.parseRoute((String) entry.getValue()); + headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route); + } else { + headers.setHeader(entry.getKey(), entry.getValue()); + } + } + + headers.setContentType(this.dataMimeType); + headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType); + if (replyMono != null) { + headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono); + } + headers.setHeader( + HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, + this.strategies.dataBufferFactory()); + + return headers.getMessageHeaders(); + } + + private MessageHeaders createHeaders( + String route, FrameType frameType, @Nullable MonoProcessor replyMono) { + + MessageHeaderAccessor headers = new MessageHeaderAccessor(); + headers.setLeaveMutable(true); + + headers.setHeader( + DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, + this.routeMatcher.parseRoute(route)); + + headers.setContentType(this.dataMimeType); + headers.setHeader(RSocketFrameTypeMessageCondition.FRAME_TYPE_HEADER, frameType); + if (replyMono != null) { + headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono); + } + headers.setHeader( + HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, + this.strategies.dataBufferFactory()); + + return headers.getMessageHeaders(); + } +} diff --git a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MetricsAwareRSocketRequester.java b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MetricsAwareRSocketRequester.java index ec0aabd4..2c2a204d 100644 --- a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MetricsAwareRSocketRequester.java +++ b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/MetricsAwareRSocketRequester.java @@ -1,12 +1,29 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netifi.spring.messaging; import io.micrometer.core.instrument.MeterRegistry; import io.opentracing.Tracer; import io.rsocket.RSocket; import io.rsocket.rpc.metrics.Metrics; -import org.reactivestreams.Publisher; +import java.util.function.Consumer; import org.springframework.core.ParameterizedTypeReference; import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.util.MimeType; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -17,7 +34,10 @@ public class MetricsAwareRSocketRequester implements RSocketRequester { private final Tracer tracer; public MetricsAwareRSocketRequester( - RSocketRequester requester, MeterRegistry meterRegistry, Tracer tracer) { + RSocketRequester requester, + RSocketStrategies rSocketStrategies, + MeterRegistry meterRegistry, + Tracer tracer) { this.rSocketRequester = requester; this.registry = meterRegistry; this.tracer = tracer; @@ -29,8 +49,24 @@ public RSocket rsocket() { } @Override - public RequestSpec route(String route) { - return new MetricsAwareRequestSpec(rSocketRequester.route(route), route, registry, tracer); + public MimeType dataMimeType() { + return rSocketRequester.dataMimeType(); + } + + @Override + public MimeType metadataMimeType() { + return rSocketRequester.metadataMimeType(); + } + + @Override + public RequestSpec route(String route, Object... routeVars) { + return new MetricsAwareRequestSpec( + rSocketRequester.route(route, routeVars), route, registry, tracer); + } + + @Override + public RequestSpec metadata(Object metadata, MimeType mimeType) { + return null; } private static final class MetricsAwareRequestSpec implements RequestSpec { @@ -49,40 +85,36 @@ private MetricsAwareRequestSpec( } @Override - public ResponseSpec data(Object data) { - return requestSpec.data(data); + public RequestSpec metadata(Consumer> configurer) { + return null; } @Override - public > ResponseSpec data(P publisher, Class dataType) { - return requestSpec.data(publisher, dataType); + public RetrieveSpec data(Object data) { + requestSpec.data(data); + return this; } @Override - public > ResponseSpec data( - P publisher, ParameterizedTypeReference dataTypeRef) { - return requestSpec.data(publisher, dataTypeRef); + public RetrieveSpec data(Object producer, Class elementClass) { + requestSpec.data(producer, elementClass); + return this; } - } - private static final class MetricsAwareResponseSepc implements ResponseSpec { - - private final ResponseSpec responseSpec; - private final String route; - private final MeterRegistry registry; - private final Tracer tracer; + @Override + public RetrieveSpec data(Object producer, ParameterizedTypeReference elementTypeRef) { + requestSpec.data(producer, elementTypeRef); + return this; + } - private MetricsAwareResponseSepc( - ResponseSpec spec, String route, MeterRegistry registry, Tracer tracer) { - this.responseSpec = spec; - this.route = route; - this.registry = registry; - this.tracer = tracer; + @Override + public RequestSpec metadata(Object metadata, MimeType mimeType) { + return requestSpec; } @Override public Mono send() { - return responseSpec + return requestSpec .send() .transform( Metrics.timed( @@ -91,9 +123,9 @@ public Mono send() { @Override public Mono retrieveMono(Class dataType) { - return responseSpec + return requestSpec .retrieveMono(dataType) - .transform( + .transform( Metrics.timed( registry, "rsocket.spring.client", @@ -107,9 +139,9 @@ public Mono retrieveMono(Class dataType) { @Override public Mono retrieveMono(ParameterizedTypeReference dataTypeRef) { - return responseSpec + return requestSpec .retrieveMono(dataTypeRef) - .transform( + .transform( Metrics.timed( registry, "rsocket.spring.client", @@ -123,9 +155,9 @@ public Mono retrieveMono(ParameterizedTypeReference dataTypeRef) { @Override public Flux retrieveFlux(Class dataType) { - return responseSpec + return requestSpec .retrieveFlux(dataType) - .transform( + .transform( Metrics.timed( registry, "rsocket.spring.client", @@ -139,9 +171,9 @@ public Flux retrieveFlux(Class dataType) { @Override public Flux retrieveFlux(ParameterizedTypeReference dataTypeRef) { - return responseSpec + return requestSpec .retrieveFlux(dataTypeRef) - .transform( + .transform( Metrics.timed( registry, "rsocket.spring.client", diff --git a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/RSocketRequesterStaticFactory.java b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/RSocketRequesterStaticFactory.java index 84c37c97..0610dfc7 100644 --- a/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/RSocketRequesterStaticFactory.java +++ b/netifi-spring-messaging/src/main/java/com/netifi/spring/messaging/RSocketRequesterStaticFactory.java @@ -1,38 +1,69 @@ +/* + * Copyright 2019 The Netifi Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netifi.spring.messaging; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; import com.netifi.broker.rsocket.BrokerSocket; -import com.netifi.broker.rsocket.NamedRSocketClientWrapper; import com.netifi.common.tags.Tags; import com.netifi.spring.core.annotation.BrokerClientStaticFactory; import io.micrometer.core.instrument.MeterRegistry; import io.opentracing.Tracer; import org.springframework.messaging.rsocket.RSocketRequester; import org.springframework.messaging.rsocket.RSocketStrategies; +import org.springframework.util.MimeTypeUtils; class RSocketRequesterStaticFactory { static RSocketRequester createRSocketRequester( - String rSocketName, - BrokerClient brokerClient, + BrokerService brokerClient, com.netifi.spring.core.annotation.BrokerClient brokerClientAnnotation, Tags tags, - RSocketStrategies rSocketStrategies) { - return RSocketRequester.create( + RSocketStrategies rSocketStrategies, + MeterRegistry meterRegistry, + Tracer tracer) { + if (meterRegistry != null && tracer != null) { + return new MetricsAwareRSocketRequester( + RSocketRequester.wrap( + resolveBrokerClientRSocket( + brokerClient, + brokerClientAnnotation.type(), + brokerClientAnnotation.group(), + brokerClientAnnotation.destination(), + tags), + MimeTypeUtils.ALL, + MimeTypeUtils.ALL, + rSocketStrategies), + rSocketStrategies, + meterRegistry, + tracer); + } + return RSocketRequester.wrap( resolveBrokerClientRSocket( - rSocketName, brokerClient, brokerClientAnnotation.type(), brokerClientAnnotation.group(), brokerClientAnnotation.destination(), tags), - null, + MimeTypeUtils.ALL, + MimeTypeUtils.ALL, rSocketStrategies); } static RSocketRequester createRSocketRequester( - String rSocketName, - BrokerClient brokerClient, + BrokerService brokerClient, com.netifi.spring.core.annotation.BrokerClient.Type routingType, String group, String destination, @@ -41,26 +72,32 @@ static RSocketRequester createRSocketRequester( MeterRegistry meterRegistry, Tracer tracer) { - return new MetricsAwareRSocketRequester( - RSocketRequester.wrap( - resolveBrokerClientRSocket( - rSocketName, brokerClient, routingType, group, destination, tags), - null, - rSocketStrategies), - meterRegistry, - tracer); + if (meterRegistry != null && tracer != null) { + return new MetricsAwareRSocketRequester( + RSocketRequester.wrap( + resolveBrokerClientRSocket(brokerClient, routingType, group, destination, tags), + MimeTypeUtils.ALL, + MimeTypeUtils.ALL, + rSocketStrategies), + rSocketStrategies, + meterRegistry, + tracer); + } else { + return RSocketRequester.wrap( + resolveBrokerClientRSocket(brokerClient, routingType, group, destination, tags), + MimeTypeUtils.ALL, + MimeTypeUtils.ALL, + rSocketStrategies); + } } static BrokerSocket resolveBrokerClientRSocket( - String rSocketName, - BrokerClient brokerClient, + BrokerService brokerClient, com.netifi.spring.core.annotation.BrokerClient.Type routingType, String group, String destination, Tags tags) { - return NamedRSocketClientWrapper.wrap( - rSocketName, - BrokerClientStaticFactory.createBrokerRSocket( - brokerClient, routingType, group, destination, tags)); + return BrokerClientStaticFactory.createBrokerRSocket( + brokerClient, routingType, group, destination, tags); } } diff --git a/netifi-spring-messaging/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java b/netifi-spring-messaging/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java index 7f4945b7..69b5957b 100644 --- a/netifi-spring-messaging/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java +++ b/netifi-spring-messaging/src/test/java/com/netifi/spring/core/TestIdlServiceServer.java @@ -15,6 +15,7 @@ */ package com.netifi.spring.core; +import io.rsocket.ipc.MutableRouter; import io.rsocket.rpc.AbstractRSocketService; @javax.annotation.Generated( @@ -30,4 +31,7 @@ public class TestIdlServiceServer extends AbstractRSocketService { public Class getServiceClass() { return TestIdl.class; } + + @Override + public void selfRegister(MutableRouter router) {} } diff --git a/netifi-tracing-idl/gradle/dependency-locks/compile.lockfile~merged b/netifi-tracing-idl/gradle/dependency-locks/compile.lockfile~merged deleted file mode 100644 index 656c5dbc..00000000 --- a/netifi-tracing-idl/gradle/dependency-locks/compile.lockfile~merged +++ /dev/null @@ -1,3 +0,0 @@ -# This is a Gradle generated file for dependency locking. -# Manual edits can break the build and are not advised. -# This file is expected to be part of source control. diff --git a/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerTracerSupplier.java b/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerTracerSupplier.java index 0ed7057e..7e815aba 100644 --- a/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerTracerSupplier.java +++ b/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerTracerSupplier.java @@ -17,7 +17,7 @@ import brave.Tracing; import brave.opentracing.BraveTracer; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerService; import com.netifi.broker.rsocket.BrokerSocket; import io.opentracing.Tracer; import java.util.Optional; @@ -30,7 +30,7 @@ public class BrokerTracerSupplier implements Supplier { private final Tracer tracer; @Inject - public BrokerTracerSupplier(BrokerClient brokerClient, Optional tracingGroup) { + public BrokerTracerSupplier(BrokerService brokerClient, Optional tracingGroup) { BrokerSocket brokerSocket = brokerClient.group(tracingGroup.orElse("com.netifi.broker.tracing")); @@ -38,7 +38,7 @@ public BrokerTracerSupplier(BrokerClient brokerClient, Optional tracingG new BrokerTracingServiceClient(brokerSocket); BrokerReporter reporter = new BrokerReporter( - brokerTracingServiceClient, brokerClient.getGroupName(), brokerClient.getTags()); + brokerTracingServiceClient, brokerClient.groupName(), brokerClient.tags()); Tracing tracing = Tracing.newBuilder().spanReporter(reporter).build(); diff --git a/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerZipkinHttpBridge.java b/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerZipkinHttpBridge.java index afc38d8c..e746598f 100644 --- a/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerZipkinHttpBridge.java +++ b/netifi-tracing-openzipkin/src/main/java/com/netifi/broker/tracing/BrokerZipkinHttpBridge.java @@ -16,7 +16,8 @@ package com.netifi.broker.tracing; import com.google.protobuf.InvalidProtocolBufferException; -import com.netifi.broker.BrokerClient; +import com.netifi.broker.BrokerFactory; +import com.netifi.broker.RoutingBrokerService; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelOption; import java.time.Duration; @@ -75,22 +76,20 @@ public static void main(String... args) { logger.info("zipkin spans url - {}", zipkinSpansUrl); logger.info("access key - {}", accessKey); - BrokerClient brokerClient = - BrokerClient.tcp() - .accessKey(accessKey) - .accessToken(accessToken) - .group(group) - .host(brokerHost) - .port(brokerPort) - .destination("standaloneZipkinBridge") - .build(); + RoutingBrokerService brokerClient = + BrokerFactory.connect() + .authentication(spec -> spec.simple().key(accessKey).token(accessToken)) + .connection(spec -> spec.tcp()) + .destinationInfo(spec -> spec.groupName(group).destinationTag("standaloneZipkinBridge")) + .discoveryStrategy(spec -> spec.simple(brokerPort, brokerHost)) + .toRoutingService(); - brokerClient.addService( - new BrokerTracingServiceServer( + new BrokerTracingServiceServer( new BrokerZipkinHttpBridge(zipkinHost, zipkinPort, zipkinSpansUrl), Optional.empty(), Optional.empty(), - Optional.empty())); + Optional.empty()) + .selfRegister(brokerClient.router()); brokerClient.onClose().block(); } diff --git a/settings.gradle b/settings.gradle index 648038f4..62e1b778 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,6 +10,10 @@ // } //} +plugins { + id 'com.gradle.enterprise' version '3.1' +} + rootProject.name = 'netifi-java' include 'netifi-bom' @@ -30,3 +34,10 @@ include 'netifi-spring-core' include 'netifi-spring-messaging' include 'netifi-spring-boot-starter' include 'netifi-spring-boot-autoconfigure' + +gradleEnterprise { + buildScan { + termsOfServiceUrl = 'https://gradle.com/terms-of-service' + termsOfServiceAgree = 'yes' + } +}