diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java index c64b9e26ea6e..e4b3da8ca8c3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java @@ -20,7 +20,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; -import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; import javax.annotation.Nullable; @@ -81,9 +80,7 @@ public static ThreadPoolExecutor createCachedThreadPool( public static ScheduledExecutorService createScheduledThreadPool( int threadNum, String namePrefix) { - return new ScheduledThreadPoolExecutor( - threadNum, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix).build()); + return new ScheduledThreadPoolExecutor(threadNum, newDaemonThreadFactory(namePrefix)); } /** This method aims to parallel process tasks with memory control and sequentially. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index e18946b3374b..f3007bf4bf02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -47,15 +47,16 @@ /** A catalog implementation for REST. */ public class RESTCatalog implements Catalog { - private RESTClient client; - private ResourcePaths resourcePaths; - private Map options; - private Map baseHeader; - // a lazy thread pool for token refresh + + private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); + + private final RESTClient client; + private final ResourcePaths resourcePaths; + private final Map options; + private final Map baseHeader; private final AuthSession catalogAuth; - private volatile ScheduledExecutorService refreshExecutor = null; - private static final ObjectMapper objectMapper = RESTObjectMapper.create(); + private volatile ScheduledExecutorService refreshExecutor = null; public RESTCatalog(Options options) { if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { @@ -71,7 +72,7 @@ public RESTCatalog(Options options) { uri, connectTimeout, readTimeout, - objectMapper, + OBJECT_MAPPER, threadPoolSize, DefaultErrorHandler.getInstance()); this.client = new HttpClient(httpClientOptions); @@ -194,7 +195,14 @@ public boolean allowUpperCase() { } @Override - public void close() throws Exception {} + public void close() throws Exception { + if (refreshExecutor != null) { + refreshExecutor.shutdownNow(); + } + if (client != null) { + client.close(); + } + } @VisibleForTesting Map fetchOptionsFromServer( diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java index 8f7bea91dcd3..1af64def4f71 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java @@ -25,31 +25,37 @@ /** Options for REST Catalog. */ public class RESTCatalogOptions { + public static final ConfigOption URI = ConfigOptions.key("uri") .stringType() .noDefaultValue() .withDescription("REST Catalog server's uri."); + public static final ConfigOption CONNECTION_TIMEOUT = ConfigOptions.key("rest.client.connection-timeout") .durationType() .noDefaultValue() .withDescription("REST Catalog http client connect timeout."); + public static final ConfigOption READ_TIMEOUT = ConfigOptions.key("rest.client.read-timeout") .durationType() .noDefaultValue() .withDescription("REST Catalog http client read timeout."); + public static final ConfigOption THREAD_POOL_SIZE = ConfigOptions.key("rest.client.num-threads") .intType() .defaultValue(1) .withDescription("REST Catalog http client thread num."); + public static final ConfigOption TOKEN = ConfigOptions.key("token") .stringType() .noDefaultValue() .withDescription("REST Catalog auth token."); + public static final ConfigOption TOKEN_EXPIRATION_TIME = ConfigOptions.key("token.expiration-time") .durationType() @@ -59,6 +65,7 @@ public class RESTCatalogOptions { + " the token expires time is t2, we need to guarantee that t2 > t1," + " the token validity time is [t2 - t1, t2]," + " and the expires time defined here needs to be less than (t2 - t1)"); + public static final ConfigOption TOKEN_PROVIDER_PATH = ConfigOptions.key("token.provider.path") .stringType() diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 1fad87588a33..aaca6193802d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -20,6 +20,7 @@ /** Resource paths for REST catalog. */ public class ResourcePaths { + public static final String V1_CONFIG = "/api/v1/config"; public static ResourcePaths forCatalogProperties(String prefix) { diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 3ed8730862ee..f3f56e97215f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -36,9 +36,9 @@ /** Test for REST Catalog. */ public class RESTCatalogTest { + private MockWebServer mockWebServer; private RESTCatalog restCatalog; - private final String initToken = "init_token"; @Before public void setUp() throws IOException { @@ -47,6 +47,7 @@ public void setUp() throws IOException { String baseUrl = mockWebServer.url("").toString(); Options options = new Options(); options.set(RESTCatalogOptions.URI, baseUrl); + String initToken = "init_token"; options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); mockOptions(RESTCatalogInternalOptions.PREFIX.key(), "prefix");