From 587fa2819eb2df36dd03ecbf3b96483472a610b6 Mon Sep 17 00:00:00 2001 From: jerry Date: Fri, 17 Jan 2025 13:09:33 +0800 Subject: [PATCH] [rest] Add http conf and ExponentialHttpRetryInterceptor to handle retry In RESTCatalog (#4929) --- .../rest/ExponentialHttpRetryInterceptor.java | 175 ++++++++++++++++++ .../org/apache/paimon/rest/HttpClient.java | 29 ++- .../apache/paimon/rest/HttpClientOptions.java | 28 ++- .../paimon/rest/RESTCatalogOptions.java | 18 +- .../ExponentialHttpRetryInterceptorTest.java | 136 ++++++++++++++ .../apache/paimon/rest/HttpClientTest.java | 14 +- 6 files changed, 377 insertions(+), 23 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java new file mode 100644 index 000000000000..dd16e47fc580 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; +import org.apache.paimon.shade.guava30.com.google.common.net.HttpHeaders; + +import okhttp3.Interceptor; +import okhttp3.Request; +import okhttp3.Response; + +import javax.net.ssl.SSLException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Defines exponential HTTP request retry interceptor. + * + *

The following retrievable IOException + * + *

+ * + *

The following retrievable HTTP status codes are defined: + * + *

+ * + *

The following retrievable HTTP method which is idempotent are defined: + * + *

+ */ +public class ExponentialHttpRetryInterceptor implements Interceptor { + + private final int maxRetries; + private final Set> nonRetriableExceptions; + private final Set retrievableCodes; + private final Set retrievableMethods; + + public ExponentialHttpRetryInterceptor(int maxRetries) { + this.maxRetries = maxRetries; + this.retrievableMethods = + ImmutableSet.of("GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS"); + this.retrievableCodes = ImmutableSet.of(429, 502, 503, 504); + this.nonRetriableExceptions = + ImmutableSet.of( + InterruptedIOException.class, + UnknownHostException.class, + ConnectException.class, + NoRouteToHostException.class, + SSLException.class); + } + + @Override + public Response intercept(Chain chain) throws IOException { + Request request = chain.request(); + Response response = null; + + for (int retryCount = 1; ; retryCount++) { + try { + response = chain.proceed(request); + } catch (IOException e) { + if (needRetry(request.method(), e, retryCount)) { + wait(response, retryCount); + continue; + } + } + if (needRetry(response, retryCount)) { + if (response != null) { + response.close(); + } + wait(response, retryCount); + } else { + return response; + } + } + } + + public boolean needRetry(Response response, int execCount) { + if (execCount > maxRetries) { + return false; + } + return response == null + || (!response.isSuccessful() && retrievableCodes.contains(response.code())); + } + + public boolean needRetry(String method, IOException e, int execCount) { + if (execCount > maxRetries) { + return false; + } + if (!retrievableMethods.contains(method)) { + return false; + } + if (nonRetriableExceptions.contains(e.getClass())) { + return false; + } else { + for (Class rejectException : nonRetriableExceptions) { + if (rejectException.isInstance(e)) { + return false; + } + } + } + return true; + } + + public long getRetryIntervalInMilliseconds(Response response, int execCount) { + // a server may send a 429 / 503 with a Retry-After header + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After + String retryAfterStrInSecond = + response == null ? null : response.header(HttpHeaders.RETRY_AFTER); + Long retryAfter = null; + if (retryAfterStrInSecond != null) { + try { + retryAfter = Long.parseLong(retryAfterStrInSecond) * 1000; + } catch (Throwable ignore) { + } + + if (retryAfter != null && retryAfter > 0) { + return retryAfter; + } + } + + int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1.0), 64.0); + int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1))); + + return delayMillis + jitter; + } + + private void wait(Response response, int retryCount) throws InterruptedIOException { + try { + Thread.sleep(getRetryIntervalInMilliseconds(response, retryCount)); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java index 08d6c8a050a0..5a13a51ef7fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java @@ -26,6 +26,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import okhttp3.ConnectionPool; import okhttp3.Dispatcher; import okhttp3.Headers; import okhttp3.MediaType; @@ -40,6 +41,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; import static okhttp3.ConnectionSpec.CLEARTEXT; import static okhttp3.ConnectionSpec.COMPATIBLE_TLS; @@ -52,6 +54,7 @@ public class HttpClient implements RESTClient { private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL"; private static final MediaType MEDIA_TYPE = MediaType.parse("application/json"); + private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000; private final OkHttpClient okHttpClient; private final String uri; @@ -191,14 +194,30 @@ private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions BlockingQueue workQueue = new SynchronousQueue<>(); ExecutorService executorService = createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue); - + ConnectionPool connectionPool = + new ConnectionPool( + httpClientOptions.maxConnections(), + CONNECTION_KEEP_ALIVE_DURATION_MS, + TimeUnit.MILLISECONDS); + Dispatcher dispatcher = new Dispatcher(executorService); + // set max requests per host use max connections + dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections()); OkHttpClient.Builder builder = new OkHttpClient.Builder() - .dispatcher(new Dispatcher(executorService)) + .dispatcher(dispatcher) .retryOnConnectionFailure(true) - .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT)); - httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout); - httpClientOptions.readTimeout().ifPresent(builder::readTimeout); + .connectionPool(connectionPool) + .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT)) + .addInterceptor( + new ExponentialHttpRetryInterceptor( + httpClientOptions.maxRetries())); + httpClientOptions + .connectTimeout() + .ifPresent( + timeoutDuration -> { + builder.connectTimeout(timeoutDuration); + builder.readTimeout(timeoutDuration); + }); return builder.build(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java index 00ae1a529e89..548a98956821 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java @@ -30,26 +30,30 @@ public class HttpClientOptions { private final String uri; @Nullable private final Duration connectTimeout; - @Nullable private final Duration readTimeout; private final int threadPoolSize; + private final int maxConnections; + private final int maxRetries; public HttpClientOptions( String uri, @Nullable Duration connectTimeout, - @Nullable Duration readTimeout, - int threadPoolSize) { + int threadPoolSize, + int maxConnections, + int maxRetries) { this.uri = uri; this.connectTimeout = connectTimeout; - this.readTimeout = readTimeout; this.threadPoolSize = threadPoolSize; + this.maxConnections = maxConnections; + this.maxRetries = maxRetries; } public static HttpClientOptions create(Options options) { return new HttpClientOptions( options.get(RESTCatalogOptions.URI), options.get(RESTCatalogOptions.CONNECTION_TIMEOUT), - options.get(RESTCatalogOptions.READ_TIMEOUT), - options.get(RESTCatalogOptions.THREAD_POOL_SIZE)); + options.get(RESTCatalogOptions.THREAD_POOL_SIZE), + options.get(RESTCatalogOptions.MAX_CONNECTIONS), + options.get(RESTCatalogOptions.MAX_RETIES)); } public String uri() { @@ -60,11 +64,15 @@ public Optional connectTimeout() { return Optional.ofNullable(connectTimeout); } - public Optional readTimeout() { - return Optional.ofNullable(readTimeout); - } - public int threadPoolSize() { return threadPoolSize; } + + public int maxConnections() { + return maxConnections; + } + + public int maxRetries() { + return Math.max(maxRetries, 0); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 1af64def4f71..843228fa0707 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -35,14 +35,20 @@ public class RESTCatalogOptions { public static final ConfigOption CONNECTION_TIMEOUT = ConfigOptions.key("rest.client.connection-timeout") .durationType() - .noDefaultValue() + .defaultValue(Duration.ofSeconds(180)) .withDescription("REST Catalog http client connect timeout."); - public static final ConfigOption READ_TIMEOUT = - ConfigOptions.key("rest.client.read-timeout") - .durationType() - .noDefaultValue() - .withDescription("REST Catalog http client read timeout."); + public static final ConfigOption MAX_CONNECTIONS = + ConfigOptions.key("rest.client.max-connections") + .intType() + .defaultValue(100) + .withDescription("REST Catalog http client's max connections."); + + public static final ConfigOption MAX_RETIES = + ConfigOptions.key("rest.client.max-retries") + .intType() + .defaultValue(5) + .withDescription("REST Catalog http client's max retry times."); public static final ConfigOption THREAD_POOL_SIZE = ConfigOptions.key("rest.client.num-threads") diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java new file mode 100644 index 000000000000..6510371f2d27 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import org.apache.hc.core5.http.HttpHeaders; +import org.junit.jupiter.api.Test; + +import javax.net.ssl.SSLException; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link ExponentialHttpRetryInterceptor}. */ +class ExponentialHttpRetryInterceptorTest { + + private final int maxRetries = 5; + private final ExponentialHttpRetryInterceptor interceptor = + new ExponentialHttpRetryInterceptor(maxRetries); + + @Test + void testNeedRetryByMethod() { + + assertThat(interceptor.needRetry("GET", new IOException(), 1)).isTrue(); + assertThat(interceptor.needRetry("HEAD", new IOException(), 1)).isTrue(); + assertThat(interceptor.needRetry("PUT", new IOException(), 1)).isTrue(); + assertThat(interceptor.needRetry("DELETE", new IOException(), 1)).isTrue(); + assertThat(interceptor.needRetry("TRACE", new IOException(), 1)).isTrue(); + assertThat(interceptor.needRetry("OPTIONS", new IOException(), 1)).isTrue(); + + assertThat(interceptor.needRetry("POST", new IOException(), 1)).isFalse(); + assertThat(interceptor.needRetry("PATCH", new IOException(), 1)).isFalse(); + assertThat(interceptor.needRetry("CONNECT", new IOException(), 1)).isFalse(); + assertThat(interceptor.needRetry("GET", new IOException(), maxRetries + 1)).isFalse(); + } + + @Test + void testNeedRetryByException() { + + assertThat(interceptor.needRetry("GET", new InterruptedIOException(), 1)).isFalse(); + assertThat(interceptor.needRetry("GET", new UnknownHostException(), 1)).isFalse(); + assertThat(interceptor.needRetry("GET", new ConnectException(), 1)).isFalse(); + assertThat(interceptor.needRetry("GET", new NoRouteToHostException(), 1)).isFalse(); + assertThat(interceptor.needRetry("GET", new SSLException("error"), 1)).isFalse(); + + assertThat(interceptor.needRetry("GET", new IOException("error"), 1)).isTrue(); + assertThat(interceptor.needRetry("GET", new IOException("error"), maxRetries + 1)) + .isFalse(); + } + + @Test + void testRetryByResponse() { + + assertThat(interceptor.needRetry(createResponse(429), 1)).isTrue(); + assertThat(interceptor.needRetry(createResponse(503), 1)).isTrue(); + assertThat(interceptor.needRetry(createResponse(502), 1)).isTrue(); + assertThat(interceptor.needRetry(createResponse(504), 1)).isTrue(); + + assertThat(interceptor.needRetry(createResponse(500), 1)).isFalse(); + assertThat(interceptor.needRetry(createResponse(404), 1)).isFalse(); + assertThat(interceptor.needRetry(createResponse(200), 1)).isFalse(); + } + + @Test + void invalidRetryAfterHeader() { + Response response = createResponse(429, "Stuff"); + + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3)).isBetween(4000L, 5000L); + } + + @Test + void validRetryAfterHeader() { + long retryAfter = 3; + Response response = createResponse(429, retryAfter + ""); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3)) + .isEqualTo(retryAfter * 1000); + } + + @Test + void exponentialRetry() { + ExponentialHttpRetryInterceptor interceptor = new ExponentialHttpRetryInterceptor(10); + Response response = createResponse(429, "Stuff"); + + // note that the upper limit includes ~10% variability + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 0)).isEqualTo(0); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 1)).isBetween(1000L, 2000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 2)).isBetween(2000L, 3000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3)).isBetween(4000L, 5000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 4)).isBetween(8000L, 9000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 5)) + .isBetween(16000L, 18000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 6)) + .isBetween(32000L, 36000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 7)) + .isBetween(64000L, 72000L); + assertThat(interceptor.getRetryIntervalInMilliseconds(response, 10)) + .isBetween(64000L, 72000L); + } + + private static Response createResponse(int httpCode) { + return createResponse(httpCode, ""); + } + + private static Response createResponse(int httpCode, String retryAfter) { + return new Response.Builder() + .code(httpCode) + .message("message") + .protocol(Protocol.HTTP_1_1) + .request(new Request.Builder().url("http://localhost").build()) + .addHeader(HttpHeaders.RETRY_AFTER, retryAfter) + .build(); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java index 161dbaf3bb50..05078cf805f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java @@ -56,8 +56,7 @@ public void setUp() throws Exception { server.start(); errorHandler = DefaultErrorHandler.getInstance(); HttpClientOptions httpClientOptions = - new HttpClientOptions( - server.getBaseUrl(), Duration.ofSeconds(3), Duration.ofSeconds(3), 1); + new HttpClientOptions(server.getBaseUrl(), Duration.ofSeconds(3), 1, 10, 2); mockResponseData = new MockRESTData(MOCK_PATH); mockResponseDataStr = server.createResponseBody(mockResponseData); errorResponseStr = @@ -116,4 +115,15 @@ public void testDeleteFail() { server.enqueueResponse(errorResponseStr, 400); assertThrows(BadRequestException.class, () -> httpClient.delete(MOCK_PATH, headers)); } + + @Test + public void testRetry() { + HttpClient httpClient = + new HttpClient( + new HttpClientOptions( + server.getBaseUrl(), Duration.ofSeconds(30), 1, 10, 2)); + server.enqueueResponse(mockResponseDataStr, 429); + server.enqueueResponse(mockResponseDataStr, 200); + assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, headers)); + } }