Skip to content

Commit

Permalink
[rest] Remove useless fetchOptionsFromServer in RESTCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 30, 2024
1 parent 88be864 commit df38b9e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 67 deletions.
101 changes: 45 additions & 56 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,21 @@ public class RESTCatalog implements Catalog {
private final RESTClient client;
private final ResourcePaths resourcePaths;
private final AuthSession catalogAuth;
private final CatalogContext context;
private final Options options;
private final FileIO fileIO;

private volatile ScheduledExecutorService refreshExecutor = null;

public RESTCatalog(CatalogContext catalogContext) {
Options catalogOptions = catalogContext.options();
if (catalogOptions.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
public RESTCatalog(CatalogContext context) {
if (context.options().getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
String uri = catalogOptions.get(RESTCatalogOptions.URI);
String uri = context.options().get(RESTCatalogOptions.URI);
Optional<Duration> connectTimeout =
catalogOptions.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
context.options().getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
Optional<Duration> readTimeout =
catalogOptions.getOptional(RESTCatalogOptions.READ_TIMEOUT);
Integer threadPoolSize = catalogOptions.get(RESTCatalogOptions.THREAD_POOL_SIZE);
context.options().getOptional(RESTCatalogOptions.READ_TIMEOUT);
Integer threadPoolSize = context.options().get(RESTCatalogOptions.THREAD_POOL_SIZE);
HttpClientOptions httpClientOptions =
new HttpClientOptions(
uri,
Expand All @@ -129,39 +128,49 @@ public RESTCatalog(CatalogContext catalogContext) {
threadPoolSize,
DefaultErrorHandler.getInstance());
this.client = new HttpClient(httpClientOptions);
Map<String, String> baseHeader = configHeaders(catalogOptions.toMap());
Map<String, String> baseHeader = configHeaders(context.options().toMap());
CredentialsProvider credentialsProvider =
CredentialsProviderFactory.createCredentialsProvider(
catalogOptions, RESTCatalog.class.getClassLoader());
context.options(), RESTCatalog.class.getClassLoader());
if (credentialsProvider.keepRefreshed()) {
this.catalogAuth =
AuthSession.fromRefreshCredentialsProvider(
tokenRefreshExecutor(), baseHeader, credentialsProvider);

} else {
this.catalogAuth = new AuthSession(baseHeader, credentialsProvider);
}
Map<String, String> initHeaders =
RESTUtil.merge(
configHeaders(catalogOptions.toMap()), this.catalogAuth.getHeaders());
Options options =
new Options(fetchOptionsFromServer(initHeaders, catalogContext.options().toMap()));
this.context =
CatalogContext.create(
options, catalogContext.preferIO(), catalogContext.fallbackIO());
configHeaders(context.options().toMap()), this.catalogAuth.getHeaders());

this.options =
new Options(
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, initHeaders)
.merge(context.options().toMap()));
this.resourcePaths =
ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX));
this.fileIO = getFileIOFromOptions(context);

try {
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
this.fileIO =
FileIO.get(
new Path(warehouseStr),
CatalogContext.create(
options, context.preferIO(), context.fallbackIO()));
} catch (IOException e) {
LOG.warn("Can not get FileIO from options.");
throw new RuntimeException(e);
}
}

@Override
public String warehouse() {
return context.options().get(CatalogOptions.WAREHOUSE);
return options.get(CatalogOptions.WAREHOUSE);
}

@Override
public Map<String, String> options() {
return context.options().toMap();
return options.toMap();
}

@Override
Expand Down Expand Up @@ -456,7 +465,7 @@ public List<PartitionEntry> listPartitions(Identifier identifier)

@Override
public boolean caseSensitive() {
return context.options().getOptional(CASE_SENSITIVE).orElse(true);
return options.getOptional(CASE_SENSITIVE).orElse(true);
}

@Override
Expand All @@ -469,16 +478,23 @@ public void close() throws Exception {
}
}

protected Map<String, String> fetchOptionsFromServer(
Map<String, String> headers, Map<String, String> clientProperties) {
ConfigResponse response =
client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, headers);
return response.merge(clientProperties);
}

private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
GetTableResponse response = getTableResponse(identifier);

GetTableResponse response;
try {
response =
client.get(
resourcePaths.table(
identifier.getDatabaseName(), identifier.getTableName()),
GetTableResponse.class,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}

FileStoreTable table =
FileStoreTableFactory.create(
fileIO(),
Expand All @@ -499,19 +515,6 @@ private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistEx
return table;
}

private GetTableResponse getTableResponse(Identifier identifier) throws TableNotExistException {
try {
return client.get(
resourcePaths.table(identifier.getDatabaseName(), identifier.getTableName()),
GetTableResponse.class,
headers());
} catch (NoSuchResourceException e) {
throw new TableNotExistException(identifier);
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
}
}

private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}
Expand Down Expand Up @@ -542,18 +545,4 @@ private ScheduledExecutorService tokenRefreshExecutor() {

return refreshExecutor;
}

private static FileIO getFileIOFromOptions(CatalogContext context) {
try {
Options options = context.options();
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
Path warehousePath = new Path(warehouseStr);
CatalogContext contextWithNewOptions =
CatalogContext.create(options, context.preferIO(), context.fallbackIO());
return FileIO.get(warehousePath, contextWithNewOptions);
} catch (IOException e) {
LOG.warn("Can not get FileIO from options.");
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,6 @@ public void testInitFailWhenDefineWarehouse() {
() -> new RESTCatalog(CatalogContext.create(options)));
}

@Test
public void testGetConfig() {
String key = "a";
String value = "b";
String mockResponse = String.format("{\"defaults\": {\"%s\": \"%s\"}}", key, value);
mockResponse(mockResponse, 200);
Map<String, String> header = new HashMap<>();
Map<String, String> response = restCatalog.fetchOptionsFromServer(header, new HashMap<>());
assertEquals(value, response.get(key));
}

@Test
public void testListDatabases() throws JsonProcessingException {
String name = MockRESTMessage.databaseName();
Expand Down

0 comments on commit df38b9e

Please sign in to comment.