Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on configurable exception #6991

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
Comparing source compatibility of opentelemetry-sdk-common-1.47.0-SNAPSHOT.jar against opentelemetry-sdk-common-1.46.0.jar
No changes.
+++ NEW CLASS: PUBLIC(+) io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW INTERFACE: java.util.function.Predicate
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW CONSTRUCTOR: PUBLIC(+) DefaultRetryExceptionPredicate()
+++ NEW METHOD: PUBLIC(+) boolean test(java.io.IOException)
**** MODIFIED CLASS: PUBLIC ABSTRACT io.opentelemetry.sdk.common.export.RetryPolicy (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) java.util.function.Predicate<java.io.IOException> getRetryExceptionPredicate()
**** MODIFIED CLASS: PUBLIC ABSTRACT STATIC io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++* NEW METHOD: PUBLIC(+) ABSTRACT(+) io.opentelemetry.sdk.common.export.RetryPolicy$RetryPolicyBuilder setRetryExceptionPredicate(java.util.function.Predicate<java.io.IOException>)
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate.*\\}"
+ ".*" // Maybe additional grpcChannel field, signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ ", "
+ "exportAsJson=false, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3, retryExceptionPredicate=io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate.*\\}"
+ ".*" // Maybe additional signal specific fields
+ "\\}");
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public RetryInterceptor(RetryPolicy retryPolicy, Function<Response, Boolean> isR
this(
retryPolicy,
isRetryable,
RetryInterceptor::isRetryableException,
e -> retryPolicy.getRetryExceptionPredicate().test(e),
TimeUnit.NANOSECONDS::sleep,
bound -> ThreadLocalRandom.current().nextLong(bound));
}
Expand Down Expand Up @@ -144,6 +144,11 @@ private static String responseStringRepresentation(Response response) {
return joiner.toString();
}

// Visible for testing
boolean shouldRetryOnException(IOException e) {
return isRetryableException.apply(e);
}

// Visible for testing
static boolean isRetryableException(IOException e) {
if (e instanceof SocketTimeoutException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.sdk.common.export.DefaultRetryExceptionPredicate;
import io.opentelemetry.sdk.common.export.RetryPolicy;
import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpRetryException;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand All @@ -48,32 +50,40 @@ class RetryInterceptorTest {

@Mock private RetryInterceptor.Sleeper sleeper;
@Mock private RetryInterceptor.BoundedLongGenerator random;
private Function<IOException, Boolean> isRetryableException;
private Predicate<IOException> retryPredicate;

private RetryInterceptor retrier;
private OkHttpClient client;

@BeforeEach
void setUp() {
// Note: cannot replace this with lambda or method reference because we need to spy on it
isRetryableException =
DefaultRetryExceptionPredicate defaultRetryExceptionPredicate =
new DefaultRetryExceptionPredicate();
retryPredicate =
spy(
new Function<IOException, Boolean>() {
new Predicate<IOException>() {
@Override
public Boolean apply(IOException exception) {
return RetryInterceptor.isRetryableException(exception);
public boolean test(IOException e) {
return defaultRetryExceptionPredicate.test(e)
|| (e instanceof HttpRetryException
&& e.getMessage().contains("timeout retry"));
}
});

RetryPolicy retryPolicy =
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.setRetryExceptionPredicate(retryPredicate)
.build();

retrier =
new RetryInterceptor(
RetryPolicy.builder()
.setBackoffMultiplier(1.6)
.setInitialBackoff(Duration.ofSeconds(1))
.setMaxBackoff(Duration.ofSeconds(2))
.setMaxAttempts(5)
.build(),
retryPolicy,
r -> !r.isSuccessful(),
isRetryableException,
e -> retryPolicy.getRetryExceptionPredicate().test(e),
sleeper,
random);
client = new OkHttpClient.Builder().addInterceptor(retrier).build();
Expand Down Expand Up @@ -154,7 +164,7 @@ void connectTimeout() throws Exception {
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -174,7 +184,7 @@ void connectException() throws Exception {
.execute())
.isInstanceOfAny(ConnectException.class, SocketTimeoutException.class);

verify(isRetryableException, times(5)).apply(any());
verify(retryPredicate, times(5)).test(any());
// Should retry maxAttempts, and sleep maxAttempts - 1 times
verify(sleeper, times(4)).sleep(anyLong());
}
Expand All @@ -190,16 +200,16 @@ private static int freePort() {
@Test
void nonRetryableException() throws InterruptedException {
client = connectTimeoutClient();
// Override isRetryableException so that no exception is retryable
when(isRetryableException.apply(any())).thenReturn(false);
// Override retryPredicate so that no exception is retryable
when(retryPredicate.test(any())).thenReturn(false);

// Connecting to a non-routable IP address to trigger connection timeout
assertThatThrownBy(
() ->
client.newCall(new Request.Builder().url("http://10.255.255.1").build()).execute())
.isInstanceOf(SocketTimeoutException.class);

verify(isRetryableException, times(1)).apply(any());
verify(retryPredicate, times(1)).test(any());
verify(sleeper, never()).sleep(anyLong());
}

Expand All @@ -214,20 +224,51 @@ private OkHttpClient connectTimeoutClient() {
void isRetryableException() {
// Should retry on connection timeouts, where error message is "Connect timed out" or "connect
// timed out"
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("Connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(
RetryInterceptor.isRetryableException(new SocketTimeoutException("connect timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("connect timed out")))
.isTrue();
// Shouldn't retry on read timeouts, where error message is "Read timed out"
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("Read timed out")))
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("Read timed out")))
.isFalse();
// Shouldn't retry on write timeouts, where error message is "timeout", or other IOException
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException("timeout")))
// Shouldn't retry on write timeouts or other IOException
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException("timeout"))).isTrue();
assertThat(retrier.shouldRetryOnException(new SocketTimeoutException())).isTrue();
assertThat(retrier.shouldRetryOnException(new IOException("error"))).isFalse();

// Testing configured predicate
assertThat(retrier.shouldRetryOnException(new HttpRetryException("error", 400))).isFalse();
assertThat(retrier.shouldRetryOnException(new HttpRetryException("timeout retry", 400)))
.isTrue();
}

@Test
void isRetryableExceptionDefaultBehaviour() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(RetryPolicy.getDefault(), OkHttpHttpSender::isRetryable);
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isTrue();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("Connect timed out")))
.isFalse();
}

@Test
void isRetryableExceptionCustomRetryPredicate() {
RetryInterceptor retryInterceptor =
new RetryInterceptor(
RetryPolicy.builder()
.setRetryExceptionPredicate((IOException e) -> e.getMessage().equals("retry"))
.build(),
OkHttpHttpSender::isRetryable);

assertThat(retryInterceptor.shouldRetryOnException(new IOException("some message"))).isFalse();
assertThat(retryInterceptor.shouldRetryOnException(new IOException("retry"))).isTrue();
assertThat(
retryInterceptor.shouldRetryOnException(
new SocketTimeoutException("Connect timed out")))
.isFalse();
assertThat(RetryInterceptor.isRetryableException(new SocketTimeoutException())).isTrue();
assertThat(RetryInterceptor.isRetryableException(new IOException("error"))).isFalse();
}

private Response sendRequest() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.common.export;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Locale;
import java.util.function.Predicate;

public class DefaultRetryExceptionPredicate implements Predicate<IOException> {
@Override
public boolean test(IOException e) {
if (e instanceof SocketTimeoutException) {
String message = e.getMessage();
// Connect timeouts can produce SocketTimeoutExceptions with no message, or with "connect
// timed out", or timeout
if (message == null) {
return true;
}
message = message.toLowerCase(Locale.ROOT);
return message.contains("connect timed out") || message.contains("timeout");
} else if (e instanceof ConnectException) {
// Exceptions resemble: java.net.ConnectException: Failed to connect to
// localhost/[0:0:0:0:0:0:0:1]:62611
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.time.Duration;
import java.util.function.Predicate;

/**
* Configuration for exporter exponential retry policy.
Expand All @@ -28,6 +30,9 @@ public abstract class RetryPolicy {

private static final double DEFAULT_BACKOFF_MULTIPLIER = 1.5;

private static final Predicate<IOException> DEFAULT_RETRY_PREDICATE =
new DefaultRetryExceptionPredicate();

private static final RetryPolicy DEFAULT = RetryPolicy.builder().build();

RetryPolicy() {}
Expand All @@ -43,7 +48,8 @@ public static RetryPolicyBuilder builder() {
.setMaxAttempts(DEFAULT_MAX_ATTEMPTS)
.setInitialBackoff(Duration.ofSeconds(DEFAULT_INITIAL_BACKOFF_SECONDS))
.setMaxBackoff(Duration.ofSeconds(DEFAULT_MAX_BACKOFF_SECONDS))
.setBackoffMultiplier(DEFAULT_BACKOFF_MULTIPLIER);
.setBackoffMultiplier(DEFAULT_BACKOFF_MULTIPLIER)
.setRetryExceptionPredicate(DEFAULT_RETRY_PREDICATE);
}

/**
Expand All @@ -66,6 +72,9 @@ public static RetryPolicyBuilder builder() {
/** Returns the backoff multiplier. */
public abstract double getBackoffMultiplier();

/** Returns the predicate if exception is retryable. */
public abstract Predicate<IOException> getRetryExceptionPredicate();
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

/** Builder for {@link RetryPolicy}. */
@AutoValue.Builder
public abstract static class RetryPolicyBuilder {
Expand Down Expand Up @@ -96,6 +105,13 @@ public abstract static class RetryPolicyBuilder {
*/
public abstract RetryPolicyBuilder setBackoffMultiplier(double backoffMultiplier);

/**
* Set the predicate to determine if retry should happen based on exception. No retry by
* default.
*/
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);
Comment on lines +110 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you consider a customizer instead, to allow choice of either replacing or enhancing the default predicate?

I realize it's not the friendliest API, but we've found the pattern really useful in AutoConfigurationCustomizer

Suggested change
public abstract RetryPolicyBuilder setRetryExceptionPredicate(
Predicate<IOException> retryExceptionPredicate);
public abstract RetryPolicyBuilder setRetryExceptionPredicateCustomizer(
Fuction<Predicate<IOException>, Predicate<IOException>> retryExceptionCustomizer);

Copy link
Contributor Author

@YuriyHolinko YuriyHolinko Jan 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and later use something like this

    static class MyFunction implements Function<Predicate<IOException>, Predicate<IOException>> {
        private Predicate<IOException> newPredicate = e -> {
            return false; // logic for  my retryable condition
        };

        @Override
        public Predicate<IOException> apply(Predicate<IOException> defaultPredicate) {
            /**
             * use this statement to extend
             */
            return newPredicate.or(defaultPredicate); // extend


            /**
             * or this to override
             */
            return newPredicate;
        }
    }

it depends how many users will want to extend it. I started the PR quickly with a thought that I need to extend rertry policy, but after some time I realized if a user wants to change the default policy it's ok just to override.

but pls tell me if you think it's nice to have the enhance option for it and I will cover it in this PR and next PR.
Please note that I also have a plan to change the defaults for retryable exception.

Copy link
Member

@jack-berg jack-berg Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The advantage of a predicate customizer is that you can do things like:

  • return defaultPredicate.and(..)
  • return defaultPredicate.or(..)

If you're just wholesale overwriting the default predicate, you don't get anything from having a customizer:

  • return myCustomPredicate

This benefit is somewhat diminished if we make the default retry predicates accessible as part of our stable API (related to #6970), in which case, the and and or cases can be achieved without a customizer by referencing the default predicate API.

I realize it's not the friendliest API

This is the key bit to me. I'd say that API ergonomics feel really foreign to someone who hasn't written a AutoConfigurationCustomizer. That pattern doesn't show up anywhere else in our API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that API ergonomics feel really foreign to someone who hasn't written a AutoConfigurationCustomizer. That pattern doesn't show up anywhere else in our API.

good point


abstract RetryPolicy autoBuild();

/** Build and return a {@link RetryPolicy} with the values of this builder. */
Expand Down
Loading