From 8ce712d4236d33332de27416a6016689688728f8 Mon Sep 17 00:00:00 2001 From: Nemanja Mikic <44323106+n0tl3ss@users.noreply.github.com> Date: Thu, 17 Mar 2022 10:48:44 +0100 Subject: [PATCH] TracingCorePublisher should be able to handle any type of Subscriber (#46) * TracingCorePublisher should be able to handle any type of Subscriber --- tracing-core/build.gradle | 3 + .../instrument/util/TracingCorePublisher.java | 3 +- .../instrument/util/ReactorRx2JavaSpec.groovy | 119 ++++++++++++++++++ 3 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 tracing-core/src/test/groovy/io/micronaut/tracing/instrument/util/ReactorRx2JavaSpec.groovy diff --git a/tracing-core/build.gradle b/tracing-core/build.gradle index f2f6aa01a..9b9fd3337 100644 --- a/tracing-core/build.gradle +++ b/tracing-core/build.gradle @@ -13,4 +13,7 @@ dependencies { compileOnly libs.kotlinx.coroutines.reactor testImplementation mn.reactor + testImplementation mn.rxjava2 + testImplementation mn.micronaut.rxjava2.http.client + testImplementation mn.micronaut.reactor.http.client } diff --git a/tracing-core/src/main/java/io/micronaut/tracing/instrument/util/TracingCorePublisher.java b/tracing-core/src/main/java/io/micronaut/tracing/instrument/util/TracingCorePublisher.java index 7fa1dbcba..744da5591 100644 --- a/tracing-core/src/main/java/io/micronaut/tracing/instrument/util/TracingCorePublisher.java +++ b/tracing-core/src/main/java/io/micronaut/tracing/instrument/util/TracingCorePublisher.java @@ -22,6 +22,7 @@ import org.reactivestreams.Subscriber; import reactor.core.CorePublisher; import reactor.core.CoreSubscriber; +import reactor.core.publisher.Operators; import reactor.util.context.Context; /** @@ -159,7 +160,7 @@ protected void doSubscribe(Subscriber actual, ScopeManager scopeManager, Span span, boolean finishOnClose) { - CoreSubscriber coreActual = (CoreSubscriber) actual; + CoreSubscriber coreActual = Operators.toCoreSubscriber(actual); publisher.subscribe(new TracingCoreSubscriber(scopeManager, span, actual, finishOnClose, coreActual.currentContext())); } diff --git a/tracing-core/src/test/groovy/io/micronaut/tracing/instrument/util/ReactorRx2JavaSpec.groovy b/tracing-core/src/test/groovy/io/micronaut/tracing/instrument/util/ReactorRx2JavaSpec.groovy new file mode 100644 index 000000000..ffd2a96d0 --- /dev/null +++ b/tracing-core/src/test/groovy/io/micronaut/tracing/instrument/util/ReactorRx2JavaSpec.groovy @@ -0,0 +1,119 @@ +package io.micronaut.tracing.instrument.util + +import io.micronaut.context.ApplicationContext +import io.micronaut.core.annotation.Introspected +import io.micronaut.http.HttpRequest +import io.micronaut.http.annotation.Body +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.http.annotation.Header +import io.micronaut.http.annotation.Post +import io.micronaut.http.client.annotation.Client +import io.micronaut.reactor.http.client.ReactorHttpClient +import io.micronaut.runtime.server.EmbeddedServer +import io.micronaut.rxjava2.http.client.RxHttpClient +import io.micronaut.scheduling.annotation.ExecuteOn +import io.micronaut.tracing.annotation.ContinueSpan +import io.micronaut.tracing.annotation.NewSpan +import io.reactivex.Flowable +import io.reactivex.Single +import jakarta.inject.Inject +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.util.function.Tuple2 +import reactor.util.function.Tuples +import spock.lang.AutoCleanup +import spock.lang.Shared +import spock.lang.Specification + + +import static io.micronaut.scheduling.TaskExecutors.IO + +class ReactorRx2JavaSpec extends Specification { + + private static final Logger LOG = LoggerFactory.getLogger(ReactorRx2JavaSpec) + + @Shared + @AutoCleanup + EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer) + + @Shared + @AutoCleanup + RxHttpClient rxHttpClient = RxHttpClient.create(embeddedServer.URL) + + void "test RxJava2 integration"() { + expect: + List result = Flux.range(1, 1) + .flatMap { + String tracingId = UUID.randomUUID() + HttpRequest request = HttpRequest + .POST("/rxjava2/enter", new SomeBody()) + .header("X-TrackingId", tracingId) + return Mono.from(rxHttpClient.retrieve(request)).map(response -> { + Tuples.of(tracingId, response) + }) + } + .collectList() + .block() + for (Tuple2 t : result) + assert t.getT1() == t.getT2() + } + + @Introspected + static class SomeBody { + } + + @Controller("/rxjava2") + static class RxJava2Controller { + + @Inject + @Client("/") + private RxHttpClient rxHttpClient + + @Inject + @Client("/") + private ReactorHttpClient reactorHttpClient + + @ExecuteOn(IO) + @ContinueSpan + @Post("/enter") + Single test(@Header("X-TrackingId") String tracingId, @Body SomeBody body) { + LOG.info("enter") + return Single.fromPublisher( + reactorHttpClient.retrieve(HttpRequest + .GET("/rxjava2/test") + .header("X-TrackingId", tracingId), String) + ) + } + + @ExecuteOn(IO) + @Get("/test") + @ContinueSpan + Mono testRxJava2(@Header("X-TrackingId") String tracingId) { + LOG.info("test") + return Mono.from( + rxHttpClient.exchange(HttpRequest + .GET("/rxjava2/test2") + .header("X-TrackingId", tracingId), String) + ) + } + + @ExecuteOn(IO) + @Get("/test2") + @ContinueSpan + String test2RxJava2(@Header("X-TrackingId") String tracingId) { + LOG.info("test2") + return Flux.from(trackingIdRxJava2(tracingId)).blockFirst() + } + + + @NewSpan("test") + Flowable trackingIdRxJava2(String tracingId) { + return Flowable.just(tracingId, tracingId, tracingId) + } + + } + +}