diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
new file mode 100644
index 000000000000..dd16e47fc580
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptor.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
+import org.apache.paimon.shade.guava30.com.google.common.net.HttpHeaders;
+
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Defines exponential HTTP request retry interceptor.
+ *
+ *
The following retrievable IOException
+ *
+ *
+ * - InterruptedIOException
+ *
- UnknownHostException
+ *
- ConnectException
+ *
- NoRouteToHostException
+ *
- SSLException
+ *
+ *
+ * The following retrievable HTTP status codes are defined:
+ *
+ *
+ * - TOO_MANY_REQUESTS (429)
+ *
- BAD_GATEWAY (502)
+ *
- SERVICE_UNAVAILABLE (503)
+ *
- GATEWAY_TIMEOUT (504)
+ *
+ *
+ * The following retrievable HTTP method which is idempotent are defined:
+ *
+ *
+ * - GET
+ *
- HEAD
+ *
- PUT
+ *
- DELETE
+ *
- TRACE
+ *
- OPTIONS
+ *
+ */
+public class ExponentialHttpRetryInterceptor implements Interceptor {
+
+ private final int maxRetries;
+ private final Set> nonRetriableExceptions;
+ private final Set retrievableCodes;
+ private final Set retrievableMethods;
+
+ public ExponentialHttpRetryInterceptor(int maxRetries) {
+ this.maxRetries = maxRetries;
+ this.retrievableMethods =
+ ImmutableSet.of("GET", "HEAD", "PUT", "DELETE", "TRACE", "OPTIONS");
+ this.retrievableCodes = ImmutableSet.of(429, 502, 503, 504);
+ this.nonRetriableExceptions =
+ ImmutableSet.of(
+ InterruptedIOException.class,
+ UnknownHostException.class,
+ ConnectException.class,
+ NoRouteToHostException.class,
+ SSLException.class);
+ }
+
+ @Override
+ public Response intercept(Chain chain) throws IOException {
+ Request request = chain.request();
+ Response response = null;
+
+ for (int retryCount = 1; ; retryCount++) {
+ try {
+ response = chain.proceed(request);
+ } catch (IOException e) {
+ if (needRetry(request.method(), e, retryCount)) {
+ wait(response, retryCount);
+ continue;
+ }
+ }
+ if (needRetry(response, retryCount)) {
+ if (response != null) {
+ response.close();
+ }
+ wait(response, retryCount);
+ } else {
+ return response;
+ }
+ }
+ }
+
+ public boolean needRetry(Response response, int execCount) {
+ if (execCount > maxRetries) {
+ return false;
+ }
+ return response == null
+ || (!response.isSuccessful() && retrievableCodes.contains(response.code()));
+ }
+
+ public boolean needRetry(String method, IOException e, int execCount) {
+ if (execCount > maxRetries) {
+ return false;
+ }
+ if (!retrievableMethods.contains(method)) {
+ return false;
+ }
+ if (nonRetriableExceptions.contains(e.getClass())) {
+ return false;
+ } else {
+ for (Class extends IOException> rejectException : nonRetriableExceptions) {
+ if (rejectException.isInstance(e)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public long getRetryIntervalInMilliseconds(Response response, int execCount) {
+ // a server may send a 429 / 503 with a Retry-After header
+ // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
+ String retryAfterStrInSecond =
+ response == null ? null : response.header(HttpHeaders.RETRY_AFTER);
+ Long retryAfter = null;
+ if (retryAfterStrInSecond != null) {
+ try {
+ retryAfter = Long.parseLong(retryAfterStrInSecond) * 1000;
+ } catch (Throwable ignore) {
+ }
+
+ if (retryAfter != null && retryAfter > 0) {
+ return retryAfter;
+ }
+ }
+
+ int delayMillis = 1000 * (int) Math.min(Math.pow(2.0, (long) execCount - 1.0), 64.0);
+ int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMillis * 0.1)));
+
+ return delayMillis + jitter;
+ }
+
+ private void wait(Response response, int retryCount) throws InterruptedIOException {
+ try {
+ Thread.sleep(getRetryIntervalInMilliseconds(response, retryCount));
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException();
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 08d6c8a050a0..5a13a51ef7fa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -26,6 +26,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.Headers;
import okhttp3.MediaType;
@@ -40,6 +41,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
import static okhttp3.ConnectionSpec.CLEARTEXT;
import static okhttp3.ConnectionSpec.COMPATIBLE_TLS;
@@ -52,6 +54,7 @@ public class HttpClient implements RESTClient {
private static final String THREAD_NAME = "REST-CATALOG-HTTP-CLIENT-THREAD-POOL";
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json");
+ private static final int CONNECTION_KEEP_ALIVE_DURATION_MS = 300_000;
private final OkHttpClient okHttpClient;
private final String uri;
@@ -191,14 +194,30 @@ private static OkHttpClient createHttpClient(HttpClientOptions httpClientOptions
BlockingQueue workQueue = new SynchronousQueue<>();
ExecutorService executorService =
createCachedThreadPool(httpClientOptions.threadPoolSize(), THREAD_NAME, workQueue);
-
+ ConnectionPool connectionPool =
+ new ConnectionPool(
+ httpClientOptions.maxConnections(),
+ CONNECTION_KEEP_ALIVE_DURATION_MS,
+ TimeUnit.MILLISECONDS);
+ Dispatcher dispatcher = new Dispatcher(executorService);
+ // set max requests per host use max connections
+ dispatcher.setMaxRequestsPerHost(httpClientOptions.maxConnections());
OkHttpClient.Builder builder =
new OkHttpClient.Builder()
- .dispatcher(new Dispatcher(executorService))
+ .dispatcher(dispatcher)
.retryOnConnectionFailure(true)
- .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT));
- httpClientOptions.connectTimeout().ifPresent(builder::connectTimeout);
- httpClientOptions.readTimeout().ifPresent(builder::readTimeout);
+ .connectionPool(connectionPool)
+ .connectionSpecs(Arrays.asList(MODERN_TLS, COMPATIBLE_TLS, CLEARTEXT))
+ .addInterceptor(
+ new ExponentialHttpRetryInterceptor(
+ httpClientOptions.maxRetries()));
+ httpClientOptions
+ .connectTimeout()
+ .ifPresent(
+ timeoutDuration -> {
+ builder.connectTimeout(timeoutDuration);
+ builder.readTimeout(timeoutDuration);
+ });
return builder.build();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
index 00ae1a529e89..548a98956821 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClientOptions.java
@@ -30,26 +30,30 @@ public class HttpClientOptions {
private final String uri;
@Nullable private final Duration connectTimeout;
- @Nullable private final Duration readTimeout;
private final int threadPoolSize;
+ private final int maxConnections;
+ private final int maxRetries;
public HttpClientOptions(
String uri,
@Nullable Duration connectTimeout,
- @Nullable Duration readTimeout,
- int threadPoolSize) {
+ int threadPoolSize,
+ int maxConnections,
+ int maxRetries) {
this.uri = uri;
this.connectTimeout = connectTimeout;
- this.readTimeout = readTimeout;
this.threadPoolSize = threadPoolSize;
+ this.maxConnections = maxConnections;
+ this.maxRetries = maxRetries;
}
public static HttpClientOptions create(Options options) {
return new HttpClientOptions(
options.get(RESTCatalogOptions.URI),
options.get(RESTCatalogOptions.CONNECTION_TIMEOUT),
- options.get(RESTCatalogOptions.READ_TIMEOUT),
- options.get(RESTCatalogOptions.THREAD_POOL_SIZE));
+ options.get(RESTCatalogOptions.THREAD_POOL_SIZE),
+ options.get(RESTCatalogOptions.MAX_CONNECTIONS),
+ options.get(RESTCatalogOptions.MAX_RETIES));
}
public String uri() {
@@ -60,11 +64,15 @@ public Optional connectTimeout() {
return Optional.ofNullable(connectTimeout);
}
- public Optional readTimeout() {
- return Optional.ofNullable(readTimeout);
- }
-
public int threadPoolSize() {
return threadPoolSize;
}
+
+ public int maxConnections() {
+ return maxConnections;
+ }
+
+ public int maxRetries() {
+ return Math.max(maxRetries, 0);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 1af64def4f71..843228fa0707 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -35,14 +35,20 @@ public class RESTCatalogOptions {
public static final ConfigOption CONNECTION_TIMEOUT =
ConfigOptions.key("rest.client.connection-timeout")
.durationType()
- .noDefaultValue()
+ .defaultValue(Duration.ofSeconds(180))
.withDescription("REST Catalog http client connect timeout.");
- public static final ConfigOption READ_TIMEOUT =
- ConfigOptions.key("rest.client.read-timeout")
- .durationType()
- .noDefaultValue()
- .withDescription("REST Catalog http client read timeout.");
+ public static final ConfigOption MAX_CONNECTIONS =
+ ConfigOptions.key("rest.client.max-connections")
+ .intType()
+ .defaultValue(100)
+ .withDescription("REST Catalog http client's max connections.");
+
+ public static final ConfigOption MAX_RETIES =
+ ConfigOptions.key("rest.client.max-retries")
+ .intType()
+ .defaultValue(5)
+ .withDescription("REST Catalog http client's max retry times.");
public static final ConfigOption THREAD_POOL_SIZE =
ConfigOptions.key("rest.client.num-threads")
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
new file mode 100644
index 000000000000..6510371f2d27
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/ExponentialHttpRetryInterceptorTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import okhttp3.Protocol;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.SSLException;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.UnknownHostException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ExponentialHttpRetryInterceptor}. */
+class ExponentialHttpRetryInterceptorTest {
+
+ private final int maxRetries = 5;
+ private final ExponentialHttpRetryInterceptor interceptor =
+ new ExponentialHttpRetryInterceptor(maxRetries);
+
+ @Test
+ void testNeedRetryByMethod() {
+
+ assertThat(interceptor.needRetry("GET", new IOException(), 1)).isTrue();
+ assertThat(interceptor.needRetry("HEAD", new IOException(), 1)).isTrue();
+ assertThat(interceptor.needRetry("PUT", new IOException(), 1)).isTrue();
+ assertThat(interceptor.needRetry("DELETE", new IOException(), 1)).isTrue();
+ assertThat(interceptor.needRetry("TRACE", new IOException(), 1)).isTrue();
+ assertThat(interceptor.needRetry("OPTIONS", new IOException(), 1)).isTrue();
+
+ assertThat(interceptor.needRetry("POST", new IOException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("PATCH", new IOException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("CONNECT", new IOException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("GET", new IOException(), maxRetries + 1)).isFalse();
+ }
+
+ @Test
+ void testNeedRetryByException() {
+
+ assertThat(interceptor.needRetry("GET", new InterruptedIOException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("GET", new UnknownHostException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("GET", new ConnectException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("GET", new NoRouteToHostException(), 1)).isFalse();
+ assertThat(interceptor.needRetry("GET", new SSLException("error"), 1)).isFalse();
+
+ assertThat(interceptor.needRetry("GET", new IOException("error"), 1)).isTrue();
+ assertThat(interceptor.needRetry("GET", new IOException("error"), maxRetries + 1))
+ .isFalse();
+ }
+
+ @Test
+ void testRetryByResponse() {
+
+ assertThat(interceptor.needRetry(createResponse(429), 1)).isTrue();
+ assertThat(interceptor.needRetry(createResponse(503), 1)).isTrue();
+ assertThat(interceptor.needRetry(createResponse(502), 1)).isTrue();
+ assertThat(interceptor.needRetry(createResponse(504), 1)).isTrue();
+
+ assertThat(interceptor.needRetry(createResponse(500), 1)).isFalse();
+ assertThat(interceptor.needRetry(createResponse(404), 1)).isFalse();
+ assertThat(interceptor.needRetry(createResponse(200), 1)).isFalse();
+ }
+
+ @Test
+ void invalidRetryAfterHeader() {
+ Response response = createResponse(429, "Stuff");
+
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3)).isBetween(4000L, 5000L);
+ }
+
+ @Test
+ void validRetryAfterHeader() {
+ long retryAfter = 3;
+ Response response = createResponse(429, retryAfter + "");
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3))
+ .isEqualTo(retryAfter * 1000);
+ }
+
+ @Test
+ void exponentialRetry() {
+ ExponentialHttpRetryInterceptor interceptor = new ExponentialHttpRetryInterceptor(10);
+ Response response = createResponse(429, "Stuff");
+
+ // note that the upper limit includes ~10% variability
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 0)).isEqualTo(0);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 1)).isBetween(1000L, 2000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 2)).isBetween(2000L, 3000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 3)).isBetween(4000L, 5000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 4)).isBetween(8000L, 9000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 5))
+ .isBetween(16000L, 18000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 6))
+ .isBetween(32000L, 36000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 7))
+ .isBetween(64000L, 72000L);
+ assertThat(interceptor.getRetryIntervalInMilliseconds(response, 10))
+ .isBetween(64000L, 72000L);
+ }
+
+ private static Response createResponse(int httpCode) {
+ return createResponse(httpCode, "");
+ }
+
+ private static Response createResponse(int httpCode, String retryAfter) {
+ return new Response.Builder()
+ .code(httpCode)
+ .message("message")
+ .protocol(Protocol.HTTP_1_1)
+ .request(new Request.Builder().url("http://localhost").build())
+ .addHeader(HttpHeaders.RETRY_AFTER, retryAfter)
+ .build();
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
index 161dbaf3bb50..05078cf805f7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/HttpClientTest.java
@@ -56,8 +56,7 @@ public void setUp() throws Exception {
server.start();
errorHandler = DefaultErrorHandler.getInstance();
HttpClientOptions httpClientOptions =
- new HttpClientOptions(
- server.getBaseUrl(), Duration.ofSeconds(3), Duration.ofSeconds(3), 1);
+ new HttpClientOptions(server.getBaseUrl(), Duration.ofSeconds(3), 1, 10, 2);
mockResponseData = new MockRESTData(MOCK_PATH);
mockResponseDataStr = server.createResponseBody(mockResponseData);
errorResponseStr =
@@ -116,4 +115,15 @@ public void testDeleteFail() {
server.enqueueResponse(errorResponseStr, 400);
assertThrows(BadRequestException.class, () -> httpClient.delete(MOCK_PATH, headers));
}
+
+ @Test
+ public void testRetry() {
+ HttpClient httpClient =
+ new HttpClient(
+ new HttpClientOptions(
+ server.getBaseUrl(), Duration.ofSeconds(30), 1, 10, 2));
+ server.enqueueResponse(mockResponseDataStr, 429);
+ server.enqueueResponse(mockResponseDataStr, 200);
+ assertDoesNotThrow(() -> httpClient.get(MOCK_PATH, MockRESTData.class, headers));
+ }
}