Skip to content

Commit

Permalink
[flink] Add Tests and ITCases in flink for RESTCatalog (apache#4805)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry-024 authored Jan 9, 2025
1 parent 8ca41fc commit fa52896
Show file tree
Hide file tree
Showing 32 changed files with 966 additions and 536 deletions.
1 change: 0 additions & 1 deletion paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ under the License.

<properties>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
<okhttp.version>4.12.0</okhttp.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
Expand Down Expand Up @@ -504,17 +505,6 @@ private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

private void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.parseBoolean(
options.getOrDefault(
CoreOptions.AUTO_CREATE.key(),
CoreOptions.AUTO_CREATE.defaultValue().toString())),
String.format(
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

private void validateCustomTablePath(Map<String, String> options) {
if (!allowCustomTablePath() && options.containsKey(CoreOptions.PATH.key())) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
Expand All @@ -38,6 +39,7 @@
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utils for {@link Catalog}. */
public class CatalogUtils {
Expand Down Expand Up @@ -108,6 +110,17 @@ public static void checkNotBranch(Identifier identifier, String method) {
}
}

public static void validateAutoCreateClose(Map<String, String> options) {
checkArgument(
!Boolean.parseBoolean(
options.getOrDefault(
CoreOptions.AUTO_CREATE.key(),
CoreOptions.AUTO_CREATE.defaultValue().toString())),
String.format(
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

public static Table createSystemTable(Identifier identifier, Table originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.rest.exceptions.RESTException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.exceptions.ServiceUnavailableException;
import org.apache.paimon.rest.exceptions.UnsupportedOperationException;
import org.apache.paimon.rest.responses.ErrorResponse;

/** Default error handler. */
Expand All @@ -43,18 +44,20 @@ public void accept(ErrorResponse error) {
String message = error.getMessage();
switch (code) {
case 400:
throw new BadRequestException(String.format("Malformed request: %s", message));
throw new BadRequestException(String.format("%s", message));
case 401:
throw new NotAuthorizedException("Not authorized: %s", message);
case 403:
throw new ForbiddenException("Forbidden: %s", message);
case 404:
throw new NoSuchResourceException("%s", message);
throw new NoSuchResourceException(
error.getResourceType(), error.getResourceName(), "%s", message);
case 405:
case 406:
break;
case 409:
throw new AlreadyExistsException("%s", message);
throw new AlreadyExistsException(
error.getResourceType(), error.getResourceName(), "%s", message);
case 500:
throw new ServiceFailureException("Server error: %s", message);
case 501:
Expand Down
23 changes: 18 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ public HttpClient(Options options) {
}

public HttpClient(HttpClientOptions httpClientOptions) {
this.uri = httpClientOptions.uri();
if (httpClientOptions.uri() != null && httpClientOptions.uri().endsWith("/")) {
this.uri = httpClientOptions.uri().substring(0, httpClientOptions.uri().length() - 1);
} else {
this.uri = httpClientOptions.uri();
}
this.okHttpClient = createHttpClient(httpClientOptions);
this.errorHandler = DefaultErrorHandler.getInstance();
}
Expand Down Expand Up @@ -132,10 +136,19 @@ private <T extends RESTResponse> T exec(Request request, Class<T> responseType)
try (Response response = okHttpClient.newCall(request).execute()) {
String responseBodyStr = response.body() != null ? response.body().string() : null;
if (!response.isSuccessful()) {
ErrorResponse error =
new ErrorResponse(
responseBodyStr != null ? responseBodyStr : "response body is null",
response.code());
ErrorResponse error;
try {
error = OBJECT_MAPPER.readValue(responseBodyStr, ErrorResponse.class);
} catch (JsonProcessingException e) {
error =
new ErrorResponse(
null,
null,
responseBodyStr != null
? responseBodyStr
: "response body is null",
response.code());
}
errorHandler.accept(error);
}
if (responseType != null && responseBodyStr != null) {
Expand Down
57 changes: 50 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
import org.apache.paimon.rest.exceptions.BadRequestException;
import org.apache.paimon.rest.exceptions.ForbiddenException;
import org.apache.paimon.rest.exceptions.NoSuchResourceException;
import org.apache.paimon.rest.exceptions.ServiceFailureException;
import org.apache.paimon.rest.requests.AlterDatabaseRequest;
import org.apache.paimon.rest.requests.AlterTableRequest;
import org.apache.paimon.rest.requests.CreateDatabaseRequest;
Expand All @@ -45,6 +47,7 @@
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
Expand Down Expand Up @@ -75,9 +78,12 @@
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
Expand Down Expand Up @@ -209,7 +215,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
throw new DatabaseNotEmptyException(name);
}
client.delete(resourcePaths.database(name), headers());
} catch (NoSuchResourceException e) {
} catch (NoSuchResourceException | DatabaseNotExistException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(name);
}
Expand Down Expand Up @@ -249,12 +255,19 @@ public void alterDatabase(String name, List<PropertyChange> changes, boolean ign

@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException {
ListTablesResponse response =
client.get(resourcePaths.tables(databaseName), ListTablesResponse.class, headers());
if (response.getTables() != null) {
return response.getTables();
try {
ListTablesResponse response =
client.get(
resourcePaths.tables(databaseName),
ListTablesResponse.class,
headers());
if (response.getTables() != null) {
return response.getTables();
}
return ImmutableList.of();
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(databaseName);
}
return ImmutableList.of();
}

@Override
Expand All @@ -272,6 +285,9 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException {
try {
checkNotBranch(identifier, "createTable");
checkNotSystemTable(identifier, "createTable");
validateAutoCreateClose(schema.options());
CreateTableRequest request = new CreateTableRequest(identifier, schema);
client.post(
resourcePaths.tables(identifier.getDatabaseName()),
Expand All @@ -282,12 +298,24 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
}
} catch (NoSuchResourceException e) {
throw new DatabaseNotExistException(identifier.getDatabaseName());
} catch (BadRequestException e) {
throw new RuntimeException(new IllegalArgumentException(e.getMessage()));
} catch (IllegalArgumentException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException {
checkNotBranch(fromTable, "renameTable");
checkNotBranch(toTable, "renameTable");
checkNotSystemTable(fromTable, "renameTable");
checkNotSystemTable(toTable, "renameTable");
try {
RenameTableRequest request = new RenameTableRequest(toTable);
client.post(
Expand All @@ -311,6 +339,7 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
checkNotSystemTable(identifier, "alterTable");
try {
AlterTableRequest request = new AlterTableRequest(changes);
client.post(
Expand All @@ -320,16 +349,30 @@ public void alterTable(
headers());
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(identifier);
if (e.resourceType() == ErrorResponseResourceType.TABLE) {
throw new TableNotExistException(identifier);
} else if (e.resourceType() == ErrorResponseResourceType.COLUMN) {
throw new ColumnNotExistException(identifier, e.resourceName());
}
}
} catch (AlreadyExistsException e) {
throw new ColumnAlreadyExistException(identifier, e.resourceName());
} catch (ForbiddenException e) {
throw new TableNoPermissionException(identifier, e);
} catch (org.apache.paimon.rest.exceptions.UnsupportedOperationException e) {
throw new UnsupportedOperationException(e.getMessage());
} catch (ServiceFailureException e) {
throw new IllegalStateException(e.getMessage());
} catch (BadRequestException e) {
throw new RuntimeException(new IllegalArgumentException(e.getMessage()));
}
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");
try {
client.delete(
resourcePaths.table(identifier.getDatabaseName(), identifier.getTableName()),
Expand Down
54 changes: 14 additions & 40 deletions paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@

import org.apache.paimon.options.Options;

import java.util.StringJoiner;
import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;

/** Resource paths for REST catalog. */
public class ResourcePaths {

public static final String V1_CONFIG = "/v1/config";
private static final StringJoiner SLASH = new StringJoiner("/");
private static final Joiner SLASH = Joiner.on("/").skipNulls();
private static final String V1 = "/v1";
private static final String DATABASES = "databases";
private static final String TABLES = "tables";

public static final String V1_CONFIG = V1 + "/config";

public static ResourcePaths forCatalogProperties(Options options) {
return new ResourcePaths(options.get(RESTCatalogInternalOptions.PREFIX));
Expand All @@ -39,60 +43,30 @@ public ResourcePaths(String prefix) {
}

public String databases() {
return SLASH.add("v1").add(prefix).add("databases").toString();
return SLASH.join(V1, prefix, DATABASES);
}

public String database(String databaseName) {
return SLASH.add("v1").add(prefix).add("databases").add(databaseName).toString();
return SLASH.join(V1, prefix, DATABASES, databaseName);
}

public String databaseProperties(String databaseName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("properties")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, "properties");
}

public String tables(String databaseName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES);
}

public String table(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName);
}

public String renameTable(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.add("rename")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "rename");
}

public String partitions(String databaseName, String tableName) {
return SLASH.add("v1")
.add(prefix)
.add("databases")
.add(databaseName)
.add("tables")
.add(tableName)
.add("partitions")
.toString();
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES, tableName, "partitions");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class AuthSession {
private volatile Map<String, String> headers;

public AuthSession(Map<String, String> headers, CredentialsProvider credentialsProvider) {
this.headers = headers;
this.credentialsProvider = credentialsProvider;
this.headers = RESTUtil.merge(headers, this.credentialsProvider.authHeader());
}

public static AuthSession fromRefreshCredentialsProvider(
Expand Down
Loading

0 comments on commit fa52896

Please sign in to comment.