diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index efc0074a1ce9..587fc7475c40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.rest.auth.AuthSession; import org.apache.paimon.rest.auth.CredentialsProvider; @@ -73,32 +72,28 @@ public class RESTCatalog extends AbstractCatalog { private volatile ScheduledExecutorService refreshExecutor = null; public RESTCatalog(Options options) { - this(null, options); + this(options, getClient(options), getCredentialsProvider(options)); } - public RESTCatalog(FileIO fileIO, Options options) { - super(fileIO, options); - if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { - throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); - } - String uri = options.get(RESTCatalogOptions.URI); - Optional connectTimeout = - options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); - Optional readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT); - Integer threadPoolSize = options.get(RESTCatalogOptions.THREAD_POOL_SIZE); - HttpClientOptions httpClientOptions = - new HttpClientOptions( - uri, - connectTimeout, - readTimeout, - OBJECT_MAPPER, - threadPoolSize, - DefaultErrorHandler.getInstance()); - this.client = new HttpClient(httpClientOptions); - this.baseHeader = configHeaders(options.toMap()); - CredentialsProvider credentialsProvider = - CredentialsProviderFactory.createCredentialsProvider( - options, RESTCatalog.class.getClassLoader()); + public RESTCatalog( + Options options, RESTClient client, CredentialsProvider credentialsProvider) { + this( + client, + credentialsProvider, + new Options( + fetchOptionsFromServer( + client, + RESTUtil.merge( + configHeaders(options.toMap()), + credentialsProvider.authHeader()), + options.toMap()))); + } + + public RESTCatalog( + RESTClient client, CredentialsProvider credentialsProvider, Options optionsWithServer) { + super(getFileIOFromOptions(optionsWithServer), optionsWithServer); + this.client = client; + this.baseHeader = configHeaders(optionsWithServer.toMap()); if (credentialsProvider.keepRefreshed()) { this.catalogAuth = AuthSession.fromRefreshCredentialsProvider( @@ -107,9 +102,7 @@ public RESTCatalog(FileIO fileIO, Options options) { } else { this.catalogAuth = new AuthSession(this.baseHeader, credentialsProvider); } - Map initHeaders = - RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders()); - this.options = new Options(fetchOptionsFromServer(initHeaders, options.toMap())); + this.options = optionsWithServer; this.resourcePaths = ResourcePaths.forCatalogProperties( this.options.get(RESTCatalogInternalOptions.PREFIX)); @@ -234,13 +227,40 @@ public void close() throws Exception { } @VisibleForTesting - Map fetchOptionsFromServer( - Map headers, Map clientProperties) { + static Map fetchOptionsFromServer( + RESTClient client, Map headers, Map clientProperties) { ConfigResponse response = client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers); return response.merge(clientProperties); } + @VisibleForTesting + static RESTClient getClient(Options options) { + String uri = options.get(RESTCatalogOptions.URI); + Optional connectTimeout = + options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); + Optional readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT); + Integer threadPoolSize = options.get(RESTCatalogOptions.THREAD_POOL_SIZE); + HttpClientOptions httpClientOptions = + new HttpClientOptions( + uri, + connectTimeout, + readTimeout, + OBJECT_MAPPER, + threadPoolSize, + DefaultErrorHandler.getInstance()); + return new HttpClient(httpClientOptions); + } + + private static FileIO getFileIOFromOptions(Options options) { + return null; + } + + private static CredentialsProvider getCredentialsProvider(Options options) { + return CredentialsProviderFactory.createCredentialsProvider( + options, RESTCatalog.class.getClassLoader()); + } + private static Map configHeaders(Map properties) { return RESTUtil.extractPrefixMap(properties, "header."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java index a5c773cb4bd5..0d8a0fcd9014 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java @@ -21,6 +21,8 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; /** Factory to create {@link RESTCatalog}. */ public class RESTCatalogFactory implements CatalogFactory { @@ -33,6 +35,10 @@ public String identifier() { @Override public Catalog create(CatalogContext context) { - return new RESTCatalog(context.options()); + Options options = context.options(); + if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { + throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); + } + return new RESTCatalog(options); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index b2d108c31c7b..7bacc9346632 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -58,13 +58,14 @@ public class RESTCatalogTest { private MockWebServer mockWebServer; private RESTCatalog restCatalog; private RESTCatalog mockRestCatalog; + private Options options; @Before public void setUp() throws IOException { mockWebServer = new MockWebServer(); mockWebServer.start(); String baseUrl = mockWebServer.url("").toString(); - Options options = new Options(); + options = new Options(); options.set(RESTCatalogOptions.URI, baseUrl); String initToken = "init_token"; options.set(RESTCatalogOptions.TOKEN, initToken); @@ -97,7 +98,9 @@ public void testGetConfig() { String mockResponse = String.format("{\"defaults\": {\"%s\": \"%s\"}}", key, value); mockResponse(mockResponse, 200); Map header = new HashMap<>(); - Map response = restCatalog.fetchOptionsFromServer(header, new HashMap<>()); + RESTClient client = RESTCatalog.getClient(options); + Map response = + RESTCatalog.fetchOptionsFromServer(client, header, new HashMap<>()); assertEquals(value, response.get(key)); }