Skip to content

Commit

Permalink
TracingCorePublisher should be able to handle any type of Subscriber (#…
Browse files Browse the repository at this point in the history
…46)

* TracingCorePublisher should be able to handle any type of Subscriber
  • Loading branch information
n0tl3ss authored Mar 17, 2022
1 parent f11a263 commit 8ce712d
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 1 deletion.
3 changes: 3 additions & 0 deletions tracing-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ dependencies {
compileOnly libs.kotlinx.coroutines.reactor

testImplementation mn.reactor
testImplementation mn.rxjava2
testImplementation mn.micronaut.rxjava2.http.client
testImplementation mn.micronaut.reactor.http.client
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.reactivestreams.Subscriber;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/**
Expand Down Expand Up @@ -159,7 +160,7 @@ protected void doSubscribe(Subscriber<? super T> actual,
ScopeManager scopeManager,
Span span,
boolean finishOnClose) {
CoreSubscriber<? extends T> coreActual = (CoreSubscriber<? extends T>) actual;
CoreSubscriber<? super T> coreActual = Operators.toCoreSubscriber(actual);
publisher.subscribe(new TracingCoreSubscriber(scopeManager, span, actual, finishOnClose, coreActual.currentContext()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package io.micronaut.tracing.instrument.util

import io.micronaut.context.ApplicationContext
import io.micronaut.core.annotation.Introspected
import io.micronaut.http.HttpRequest
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Header
import io.micronaut.http.annotation.Post
import io.micronaut.http.client.annotation.Client
import io.micronaut.reactor.http.client.ReactorHttpClient
import io.micronaut.runtime.server.EmbeddedServer
import io.micronaut.rxjava2.http.client.RxHttpClient
import io.micronaut.scheduling.annotation.ExecuteOn
import io.micronaut.tracing.annotation.ContinueSpan
import io.micronaut.tracing.annotation.NewSpan
import io.reactivex.Flowable
import io.reactivex.Single
import jakarta.inject.Inject
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.function.Tuple2
import reactor.util.function.Tuples
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification


import static io.micronaut.scheduling.TaskExecutors.IO

class ReactorRx2JavaSpec extends Specification {

private static final Logger LOG = LoggerFactory.getLogger(ReactorRx2JavaSpec)

@Shared
@AutoCleanup
EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer)

@Shared
@AutoCleanup
RxHttpClient rxHttpClient = RxHttpClient.create(embeddedServer.URL)

void "test RxJava2 integration"() {
expect:
List<Tuple2> result = Flux.range(1, 1)
.flatMap {
String tracingId = UUID.randomUUID()
HttpRequest<Object> request = HttpRequest
.POST("/rxjava2/enter", new SomeBody())
.header("X-TrackingId", tracingId)
return Mono.from(rxHttpClient.retrieve(request)).map(response -> {
Tuples.of(tracingId, response)
})
}
.collectList()
.block()
for (Tuple2 t : result)
assert t.getT1() == t.getT2()
}

@Introspected
static class SomeBody {
}

@Controller("/rxjava2")
static class RxJava2Controller {

@Inject
@Client("/")
private RxHttpClient rxHttpClient

@Inject
@Client("/")
private ReactorHttpClient reactorHttpClient

@ExecuteOn(IO)
@ContinueSpan
@Post("/enter")
Single<String> test(@Header("X-TrackingId") String tracingId, @Body SomeBody body) {
LOG.info("enter")
return Single.fromPublisher(
reactorHttpClient.retrieve(HttpRequest
.GET("/rxjava2/test")
.header("X-TrackingId", tracingId), String)
)
}

@ExecuteOn(IO)
@Get("/test")
@ContinueSpan
Mono<String> testRxJava2(@Header("X-TrackingId") String tracingId) {
LOG.info("test")
return Mono.from(
rxHttpClient.exchange(HttpRequest
.GET("/rxjava2/test2")
.header("X-TrackingId", tracingId), String)
)
}

@ExecuteOn(IO)
@Get("/test2")
@ContinueSpan
String test2RxJava2(@Header("X-TrackingId") String tracingId) {
LOG.info("test2")
return Flux.from(trackingIdRxJava2(tracingId)).blockFirst()
}


@NewSpan("test")
Flowable<String> trackingIdRxJava2(String tracingId) {
return Flowable.just(tracingId, tracingId, tracingId)
}

}

}

0 comments on commit 8ce712d

Please sign in to comment.