diff --git a/exporters/sender/jdk/build.gradle.kts b/exporters/sender/jdk/build.gradle.kts index 2784bc66cb1..80fb0f870d3 100644 --- a/exporters/sender/jdk/build.gradle.kts +++ b/exporters/sender/jdk/build.gradle.kts @@ -18,3 +18,8 @@ tasks { options.release.set(11) } } + +tasks.test { + val testJavaVersion: String? by project + enabled = !testJavaVersion.equals("8") +} diff --git a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java index 5739ffb5596..fda90c6a8c7 100644 --- a/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java +++ b/exporters/sender/jdk/src/main/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSender.java @@ -16,6 +16,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.net.http.HttpTimeoutException; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Map; @@ -56,19 +57,16 @@ public final class JdkHttpSender implements HttpSender { private final Supplier> headerSupplier; @Nullable private final RetryPolicy retryPolicy; + // Visible for testing JdkHttpSender( + HttpClient client, String endpoint, boolean compressionEnabled, String contentType, long timeoutNanos, Supplier> headerSupplier, - @Nullable RetryPolicy retryPolicy, - @Nullable SSLContext sslContext) { - HttpClient.Builder builder = HttpClient.newBuilder().executor(executorService); - if (sslContext != null) { - builder.sslContext(sslContext); - } - this.client = builder.build(); + @Nullable RetryPolicy retryPolicy) { + this.client = client; try { this.uri = new URI(endpoint); } catch (URISyntaxException e) { @@ -81,6 +79,36 @@ public final class JdkHttpSender implements HttpSender { this.retryPolicy = retryPolicy; } + JdkHttpSender( + String endpoint, + boolean compressionEnabled, + String contentType, + long timeoutNanos, + Supplier> headerSupplier, + @Nullable RetryPolicy retryPolicy, + @Nullable SSLContext sslContext) { + this( + configureClient(sslContext), + endpoint, + compressionEnabled, + contentType, + timeoutNanos, + headerSupplier, + retryPolicy); + } + + private static HttpClient configureClient(@Nullable SSLContext sslContext) { + HttpClient.Builder builder = + HttpClient.newBuilder() + // Aligned with OkHttpClient default connect timeout + // TODO (jack-berg): Consider making connect timeout configurable + .connectTimeout(Duration.ofSeconds(10)); + if (sslContext != null) { + builder.sslContext(sslContext); + } + return builder.build(); + } + @Override public void send( Consumer marshaler, @@ -88,7 +116,15 @@ public void send( Consumer onResponse, Consumer onError) { CompletableFuture> unused = - CompletableFuture.supplyAsync(() -> sendInternal(marshaler), executorService) + CompletableFuture.supplyAsync( + () -> { + try { + return sendInternal(marshaler); + } catch (IOException e) { + throw new IllegalStateException(e); + } + }, + executorService) .whenComplete( (httpResponse, throwable) -> { if (throwable != null) { @@ -99,7 +135,8 @@ public void send( }); } - private HttpResponse sendInternal(Consumer marshaler) { + // Visible for testing + HttpResponse sendInternal(Consumer marshaler) throws IOException { long startTimeNanos = System.nanoTime(); HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(uri).timeout(Duration.ofNanos(timeoutNanos)); @@ -129,46 +166,64 @@ private HttpResponse sendInternal(Consumer marshaler) { long attempt = 0; long nextBackoffNanos = retryPolicy.getInitialBackoff().toNanos(); + HttpResponse httpResponse = null; + IOException exception = null; do { - requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos))); - HttpResponse httpResponse = sendRequest(requestBuilder, byteBufferPool); - attempt++; - if (attempt >= retryPolicy.getMaxAttempts() - || !retryableStatusCodes.contains(httpResponse.statusCode())) { - return httpResponse; + if (attempt > 0) { + // Compute and sleep for backoff + long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); + long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos); + nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier()); + try { + TimeUnit.NANOSECONDS.sleep(backoffNanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; // Break out and return response or throw + } + // If after sleeping we've exceeded timeoutNanos, break out and return response or throw + if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) { + break; + } } - // Compute and sleep for backoff - long upperBoundNanos = Math.min(nextBackoffNanos, retryPolicy.getMaxBackoff().toNanos()); - long backoffNanos = ThreadLocalRandom.current().nextLong(upperBoundNanos); - nextBackoffNanos = (long) (nextBackoffNanos * retryPolicy.getBackoffMultiplier()); + attempt++; + requestBuilder.timeout(Duration.ofNanos(timeoutNanos - (System.nanoTime() - startTimeNanos))); try { - TimeUnit.NANOSECONDS.sleep(backoffNanos); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); + httpResponse = sendRequest(requestBuilder, byteBufferPool); + } catch (IOException e) { + exception = e; } - if ((System.nanoTime() - startTimeNanos) >= timeoutNanos) { + + if (httpResponse != null && !retryableStatusCodes.contains(httpResponse.statusCode())) { return httpResponse; } - } while (true); + if (exception != null && !isRetryableException(exception)) { + throw exception; + } + } while (attempt < retryPolicy.getMaxAttempts()); + + if (httpResponse != null) { + return httpResponse; + } + throw exception; } private HttpResponse sendRequest( - HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) { + HttpRequest.Builder requestBuilder, ByteBufferPool byteBufferPool) throws IOException { try { return client.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray()); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - // TODO: is throwable retryable? + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IllegalStateException(e); } finally { byteBufferPool.resetPool(); } } + private static boolean isRetryableException(IOException throwable) { + return throwable instanceof HttpTimeoutException; + } + private static class NoCopyByteArrayOutputStream extends ByteArrayOutputStream { NoCopyByteArrayOutputStream() { super(retryableStatusCodes.size()); diff --git a/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java new file mode 100644 index 00000000000..2691bb1ca3f --- /dev/null +++ b/exporters/sender/jdk/src/test/java/io/opentelemetry/exporter/sender/jdk/internal/JdkHttpSenderTest.java @@ -0,0 +1,95 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.exporter.sender.jdk.internal; + +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.sdk.common.export.RetryPolicy; +import java.io.IOException; +import java.net.http.HttpClient; +import java.net.http.HttpConnectTimeoutException; +import java.time.Duration; +import java.util.Collections; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class JdkHttpSenderTest { + + private final HttpClient realHttpClient = + HttpClient.newBuilder().connectTimeout(Duration.ofMillis(10)).build(); + @Mock private HttpClient mockHttpClient; + private JdkHttpSender sender; + + @BeforeEach + void setup() throws IOException, InterruptedException { + // Can't directly spy on HttpClient for some reason, so create a real instance and a mock that + // delegates to the real thing + when(mockHttpClient.send(any(), any())) + .thenAnswer( + invocation -> + realHttpClient.send(invocation.getArgument(0), invocation.getArgument(1))); + sender = + new JdkHttpSender( + mockHttpClient, + "http://10.255.255.1", // Connecting to a non-routable IP address to trigger connection + // timeout + false, + "text/plain", + Duration.ofSeconds(10).toNanos(), + Collections::emptyMap, + RetryPolicy.builder() + .setMaxAttempts(2) + .setInitialBackoff(Duration.ofMillis(1)) + .build()); + } + + @Test + void sendInternal_RetryableConnectTimeoutException() throws IOException, InterruptedException { + assertThatThrownBy(() -> sender.sendInternal(marshaler -> {})) + .isInstanceOf(HttpConnectTimeoutException.class); + + verify(mockHttpClient, times(2)).send(any(), any()); + } + + @Test + void sendInternal_NonRetryableException() throws IOException, InterruptedException { + doThrow(new IOException("unknown error")).when(mockHttpClient).send(any(), any()); + + assertThatThrownBy(() -> sender.sendInternal(marshaler -> {})) + .isInstanceOf(IOException.class) + .hasMessage("unknown error"); + + verify(mockHttpClient, times(1)).send(any(), any()); + } + + @Test + void defaultConnectTimeout() { + sender = + new JdkHttpSender( + "http://localhost", true, "text/plain", 1, Collections::emptyMap, null, null); + + assertThat(sender) + .extracting("client", as(InstanceOfAssertFactories.type(HttpClient.class))) + .satisfies( + httpClient -> + assertThat(httpClient.connectTimeout().get()).isEqualTo(Duration.ofSeconds(10))); + } +}