Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
update open api add table scheme define

delete no need code and fix ut fail

update update table api

add comment for getDataOrFormatTable
  • Loading branch information
jerry-024 committed Dec 24, 2024
1 parent 1d5a8b2 commit f5c0286
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 195 deletions.
134 changes: 69 additions & 65 deletions paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;
import java.io.UncheckedIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -90,14 +91,15 @@
/** A catalog implementation for REST. */
public class RESTCatalog implements Catalog {

private static final Logger LOG = LoggerFactory.getLogger(RESTCatalog.class);
private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create();

private final RESTClient client;
private final ResourcePaths resourcePaths;
private final Map<String, String> baseHeader;
private final AuthSession catalogAuth;
private final CatalogContext context;
private final FileIO fileIO;
private final Optional<FileIO> fileIOOptional;

private volatile ScheduledExecutorService refreshExecutor = null;

Expand Down Expand Up @@ -142,23 +144,21 @@ public RESTCatalog(CatalogContext catalogContext) {
options, catalogContext.preferIO(), catalogContext.fallbackIO());
this.resourcePaths =
ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX));
this.fileIO = getFileIOFromOptions(context);
this.fileIOOptional = getFileIOFromOptions(context);
}

// todo: whether it's ok
private static FileIO getFileIOFromOptions(CatalogContext context) {
Options options = context.options();
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
Path warehousePath = new Path(warehouseStr);
FileIO fileIO;
CatalogContext contextWithNewOptions =
CatalogContext.create(options, context.preferIO(), context.fallbackIO());
private static Optional<FileIO> getFileIOFromOptions(CatalogContext context) {
try {
fileIO = FileIO.get(warehousePath, contextWithNewOptions);
} catch (IOException e) {
throw new UncheckedIOException(e);
Options options = context.options();
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
Path warehousePath = new Path(warehouseStr);
CatalogContext contextWithNewOptions =
CatalogContext.create(options, context.preferIO(), context.fallbackIO());
return Optional.of(FileIO.get(warehousePath, contextWithNewOptions));
} catch (Exception ignore) {
LOG.warn("Can not get FileIO from options.");
}
return fileIO;
return Optional.empty();
}

@Override
Expand All @@ -173,7 +173,10 @@ public Map<String, String> options() {

@Override
public FileIO fileIO() {
return this.fileIO;
if (this.fileIOOptional.isPresent()) {
return this.fileIOOptional.get();
}
throw new RuntimeException("FileIO is not configured.");
}

@Override
Expand Down Expand Up @@ -299,7 +302,9 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx
GetTableResponse.class,
headers());
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistException(identifier);
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
}
}
}

Expand All @@ -324,7 +329,7 @@ public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
try {
updateTable(identifier, null, changes);
updateTable(identifier, identifier, changes);
} catch (NoSuchResourceException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(identifier);
Expand Down Expand Up @@ -389,12 +394,46 @@ Map<String, String> fetchOptionsFromServer(
return response.merge(clientProperties);
}

private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
@VisibleForTesting
void updateTable(
Identifier fromTable, Identifier newTableIdentifier, List<SchemaChange> changes) {
UpdateTableRequest request =
new UpdateTableRequest(newTableIdentifier, new SchemaChanges(changes));
client.post(
resourcePaths.table(fromTable.getDatabaseName(), fromTable.getTableName()),
request,
GetTableResponse.class,
headers());
}

private Map<String, String> headers() {
return catalogAuth.getHeaders();
@VisibleForTesting
Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
Lock.Factory lockFactory =
Lock.factory(
lockFactory(context.options(), fileIO(), Optional.empty()).orElse(null),
lockContext(context.options()).orElse(null),
identifier);
// MetastoreClient is not used in RESTCatalog so null is ok.
FileStoreTable table =
FileStoreTableFactory.create(
fileIO(),
newTableLocation(warehouse(), identifier),
tableSchema,
new CatalogEnvironment(identifier, null, lockFactory, null));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(this.fileIO())
.build();
}
return table;
}

protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Expand All @@ -413,15 +452,12 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
}
}

// todo: how know which exception to throw
private void updateTable(Identifier fromTable, Identifier toTable, List<SchemaChange> changes) {
UpdateTableRequest request =
new UpdateTableRequest(fromTable, toTable, new SchemaChanges(changes));
client.post(
resourcePaths.table(fromTable.getDatabaseName(), fromTable.getTableName()),
request,
GetTableResponse.class,
headers());
private static Map<String, String> configHeaders(Map<String, String> properties) {
return RESTUtil.extractPrefixMap(properties, "header.");
}

private Map<String, String> headers() {
return catalogAuth.getHeaders();
}

private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExistException {
Expand All @@ -447,7 +483,7 @@ private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExist
};
Table table =
SystemTableLoader.loadGlobal(
tableName, fileIO, getAllTablePathsFunction, context.options());
tableName, fileIO(), getAllTablePathsFunction, context.options());
if (table == null) {
throw new TableNotExistException(identifier);
}
Expand All @@ -465,38 +501,6 @@ private Table getSystemTable(Identifier identifier) throws TableNotExistExceptio
return CatalogUtils.getSystemTable(identifier, originTable);
}

private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
String uuid = null;
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
newTableLocation(warehouse(), identifier),
tableSchema,
new CatalogEnvironment(
identifier,
uuid,
Lock.factory(
lockFactory(context.options(), fileIO, Optional.empty())
.orElse(null),
lockContext(context.options()).orElse(null),
identifier),
null)); // todo: whether need MetastoreClient.Factory
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(this.fileIO)
.build();
}
return table;
}

private ScheduledExecutorService tokenRefreshExecutor() {
if (refreshExecutor == null) {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

/** Factory to create {@link RESTCatalog}. */
public class RESTCatalogFactory implements CatalogFactory {
Expand All @@ -35,10 +33,6 @@ public String identifier() {

@Override
public Catalog create(CatalogContext context) {
Options options = context.options();
if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
throw new IllegalArgumentException("Can not config warehouse in RESTCatalog.");
}
return new RESTCatalog(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.rest;

import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.schema.TableSchemaSerializer;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeJsonParser;
Expand All @@ -44,6 +46,11 @@ public static ObjectMapper create() {

public static Module createPaimonRestJacksonModule() {
SimpleModule module = new SimpleModule("Paimon_REST");
registerJsonObjects(
module,
TableSchema.class,
TableSchemaSerializer.INSTANCE,
TableSchemaSerializer.INSTANCE);
registerJsonObjects(
module,
DataField.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/** Schema changes to serialize List of SchemaChange . */
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down Expand Up @@ -201,4 +202,37 @@ public List<SchemaChange.UpdateColumnComment> getUpdateColumnComments() {
public List<SchemaChange.Move> getUpdateColumnPositions() {
return updateColumnPositions;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaChanges that = (SchemaChanges) o;
return Objects.equals(setOptions, that.setOptions)
&& Objects.equals(removeOptions, that.removeOptions)
&& Objects.equals(comment, that.comment)
&& Objects.equals(addColumns, that.addColumns)
&& Objects.equals(renameColumns, that.renameColumns)
&& Objects.equals(dropColumns, that.dropColumns)
&& Objects.equals(updateColumnTypes, that.updateColumnTypes)
&& Objects.equals(updateColumnNullabilities, that.updateColumnNullabilities)
&& Objects.equals(updateColumnComments, that.updateColumnComments)
&& Objects.equals(updateColumnPositions, that.updateColumnPositions);
}

@Override
public int hashCode() {
return Objects.hash(
setOptions,
removeOptions,
comment,
addColumns,
renameColumns,
dropColumns,
updateColumnTypes,
updateColumnNullabilities,
updateColumnComments,
updateColumnPositions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,26 @@
/** Request for updating table. */
public class UpdateTableRequest implements RESTRequest {

private static final String FIELD_FROM_IDENTIFIER_NAME = "from";
private static final String FIELD_TO_IDENTIFIER_NAME = "to";
private static final String FIELD_SCHEMA_CHANGES_NAME = "changes";
private static final String FIELD_IDENTIFIER_NAME = "identifier-change";
private static final String FIELD_SCHEMA_CHANGES_NAME = "schema-changes";

@JsonProperty(FIELD_FROM_IDENTIFIER_NAME)
private Identifier fromIdentifier;

@JsonProperty(FIELD_TO_IDENTIFIER_NAME)
private Identifier toIdentifier;
@JsonProperty(FIELD_IDENTIFIER_NAME)
private Identifier identifierChange;

@JsonProperty(FIELD_SCHEMA_CHANGES_NAME)
private SchemaChanges changes;

@JsonCreator
public UpdateTableRequest(
@JsonProperty(FIELD_FROM_IDENTIFIER_NAME) Identifier fromIdentifier,
@JsonProperty(FIELD_TO_IDENTIFIER_NAME) Identifier toIdentifier,
@JsonProperty(FIELD_IDENTIFIER_NAME) Identifier identifierChange,
@JsonProperty(FIELD_SCHEMA_CHANGES_NAME) SchemaChanges changes) {
this.fromIdentifier = fromIdentifier;
this.toIdentifier = toIdentifier;
this.identifierChange = identifierChange;
this.changes = changes;
}

@JsonGetter(FIELD_FROM_IDENTIFIER_NAME)
public Identifier getFromIdentifier() {
return fromIdentifier;
}

@JsonGetter(FIELD_TO_IDENTIFIER_NAME)
public Identifier getToIdentifier() {
return toIdentifier;
@JsonGetter(FIELD_IDENTIFIER_NAME)
public Identifier getIdentifierChange() {
return identifierChange;
}

@JsonGetter(FIELD_SCHEMA_CHANGES_NAME)
Expand Down
Loading

0 comments on commit f5c0286

Please sign in to comment.