Skip to content

Commit

Permalink
change RESTCatalog extend abstract catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 committed Dec 18, 2024
1 parent 37bb3bd commit 9eceaf9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 33 deletions.
80 changes: 50 additions & 30 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.auth.CredentialsProvider;
Expand Down Expand Up @@ -73,32 +72,28 @@ public class RESTCatalog extends AbstractCatalog {
private volatile ScheduledExecutorService refreshExecutor = null;

public RESTCatalog(Options options) {
this(null, options);
this(options, getClient(options), getCredentialsProvider(options));
}

public RESTCatalog(FileIO fileIO, Options options) {
super(fileIO, options);
if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
String uri = options.get(RESTCatalogOptions.URI);
Optional<Duration> connectTimeout =
options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
Optional<Duration> readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT);
Integer threadPoolSize = options.get(RESTCatalogOptions.THREAD_POOL_SIZE);
HttpClientOptions httpClientOptions =
new HttpClientOptions(
uri,
connectTimeout,
readTimeout,
OBJECT_MAPPER,
threadPoolSize,
DefaultErrorHandler.getInstance());
this.client = new HttpClient(httpClientOptions);
this.baseHeader = configHeaders(options.toMap());
CredentialsProvider credentialsProvider =
CredentialsProviderFactory.createCredentialsProvider(
options, RESTCatalog.class.getClassLoader());
public RESTCatalog(
Options options, RESTClient client, CredentialsProvider credentialsProvider) {
this(
client,
credentialsProvider,
new Options(
fetchOptionsFromServer(
client,
RESTUtil.merge(
configHeaders(options.toMap()),
credentialsProvider.authHeader()),
options.toMap())));
}

public RESTCatalog(
RESTClient client, CredentialsProvider credentialsProvider, Options optionsWithServer) {
super(getFileIOFromOptions(optionsWithServer), optionsWithServer);
this.client = client;
this.baseHeader = configHeaders(optionsWithServer.toMap());
if (credentialsProvider.keepRefreshed()) {
this.catalogAuth =
AuthSession.fromRefreshCredentialsProvider(
Expand All @@ -107,9 +102,7 @@ public RESTCatalog(FileIO fileIO, Options options) {
} else {
this.catalogAuth = new AuthSession(this.baseHeader, credentialsProvider);
}
Map<String, String> initHeaders =
RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders());
this.options = new Options(fetchOptionsFromServer(initHeaders, options.toMap()));
this.options = optionsWithServer;
this.resourcePaths =
ResourcePaths.forCatalogProperties(
this.options.get(RESTCatalogInternalOptions.PREFIX));
Expand Down Expand Up @@ -234,13 +227,40 @@ public void close() throws Exception {
}

@VisibleForTesting
Map<String, String> fetchOptionsFromServer(
Map<String, String> headers, Map<String, String> clientProperties) {
static Map<String, String> fetchOptionsFromServer(
RESTClient client, Map<String, String> headers, Map<String, String> clientProperties) {
ConfigResponse response =
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers);
return response.merge(clientProperties);
}

@VisibleForTesting
static RESTClient getClient(Options options) {
String uri = options.get(RESTCatalogOptions.URI);
Optional<Duration> connectTimeout =
options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
Optional<Duration> readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT);
Integer threadPoolSize = options.get(RESTCatalogOptions.THREAD_POOL_SIZE);
HttpClientOptions httpClientOptions =
new HttpClientOptions(
uri,
connectTimeout,
readTimeout,
OBJECT_MAPPER,
threadPoolSize,
DefaultErrorHandler.getInstance());
return new HttpClient(httpClientOptions);
}

private static FileIO getFileIOFromOptions(Options options) {
return null;
}

private static CredentialsProvider getCredentialsProvider(Options options) {
return CredentialsProviderFactory.createCredentialsProvider(
options, RESTCatalog.class.getClassLoader());
}

private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Factory to create {@link RESTCatalog}. */
public class RESTCatalogFactory implements CatalogFactory {
Expand All @@ -33,6 +35,10 @@ public String identifier() {

@Override
public Catalog create(CatalogContext context) {
return new RESTCatalog(context.options());
Options options = context.options();
if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
return new RESTCatalog(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ public class RESTCatalogTest {
private MockWebServer mockWebServer;
private RESTCatalog restCatalog;
private RESTCatalog mockRestCatalog;
private Options options;

@Before
public void setUp() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start();
String baseUrl = mockWebServer.url("").toString();
Options options = new Options();
options = new Options();
options.set(RESTCatalogOptions.URI, baseUrl);
String initToken = "init_token";
options.set(RESTCatalogOptions.TOKEN, initToken);
Expand Down Expand Up @@ -97,7 +98,9 @@ public void testGetConfig() {
String mockResponse = String.format("{\"defaults\": {\"%s\": \"%s\"}}", key, value);
mockResponse(mockResponse, 200);
Map<String, String> header = new HashMap<>();
Map<String, String> response = restCatalog.fetchOptionsFromServer(header, new HashMap<>());
RESTClient client = RESTCatalog.getClient(options);
Map<String, String> response =
RESTCatalog.fetchOptionsFromServer(client, header, new HashMap<>());
assertEquals(value, response.get(key));
}

Expand Down

0 comments on commit 9eceaf9

Please sign in to comment.