diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 5e9eec28c91a0..448853caa9e28 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -66,8 +66,9 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.io.UncheckedIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -90,6 +91,7 @@ /** A catalog implementation for REST. */ public class RESTCatalog implements Catalog { + private static final Logger LOG = LoggerFactory.getLogger(RESTCatalog.class); private static final ObjectMapper OBJECT_MAPPER = RESTObjectMapper.create(); private final RESTClient client; @@ -97,7 +99,7 @@ public class RESTCatalog implements Catalog { private final Map baseHeader; private final AuthSession catalogAuth; private final CatalogContext context; - private final FileIO fileIO; + private final Optional fileIOOptional; private volatile ScheduledExecutorService refreshExecutor = null; @@ -142,23 +144,21 @@ public RESTCatalog(CatalogContext catalogContext) { options, catalogContext.preferIO(), catalogContext.fallbackIO()); this.resourcePaths = ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX)); - this.fileIO = getFileIOFromOptions(context); + this.fileIOOptional = getFileIOFromOptions(context); } - // todo: whether it's ok - private static FileIO getFileIOFromOptions(CatalogContext context) { - Options options = context.options(); - String warehouseStr = options.get(CatalogOptions.WAREHOUSE); - Path warehousePath = new Path(warehouseStr); - FileIO fileIO; - CatalogContext contextWithNewOptions = - CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + private static Optional getFileIOFromOptions(CatalogContext context) { try { - fileIO = FileIO.get(warehousePath, contextWithNewOptions); - } catch (IOException e) { - throw new UncheckedIOException(e); + Options options = context.options(); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); + Path warehousePath = new Path(warehouseStr); + CatalogContext contextWithNewOptions = + CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + return Optional.of(FileIO.get(warehousePath, contextWithNewOptions)); + } catch (Exception ignore) { + LOG.warn("Can not get FileIO from options."); } - return fileIO; + return Optional.empty(); } @Override @@ -173,7 +173,10 @@ public Map options() { @Override public FileIO fileIO() { - return this.fileIO; + if (this.fileIOOptional.isPresent()) { + return this.fileIOOptional.get(); + } + throw new RuntimeException("FileIO is not configured."); } @Override @@ -391,7 +394,6 @@ Map fetchOptionsFromServer( return response.merge(clientProperties); } - // todo: how know which exception to throw @VisibleForTesting void updateTable(Identifier fromTable, Identifier toTable, List changes) { UpdateTableRequest request = @@ -410,14 +412,14 @@ Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException String uuid = null; FileStoreTable table = FileStoreTableFactory.create( - fileIO, + fileIO(), newTableLocation(warehouse(), identifier), tableSchema, new CatalogEnvironment( identifier, uuid, Lock.factory( - lockFactory(context.options(), fileIO, Optional.empty()) + lockFactory(context.options(), fileIO(), Optional.empty()) .orElse(null), lockContext(context.options()).orElse(null), identifier), @@ -430,7 +432,7 @@ Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException ObjectTable.builder() .underlyingTable(table) .objectLocation(objectLocation) - .objectFileIO(this.fileIO) + .objectFileIO(this.fileIO()) .build(); } return table; @@ -483,7 +485,7 @@ private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExist }; Table table = SystemTableLoader.loadGlobal( - tableName, fileIO, getAllTablePathsFunction, context.options()); + tableName, fileIO(), getAllTablePathsFunction, context.options()); if (table == null) { throw new TableNotExistException(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java index b03c9ca4248d3..aa6e6f4d41133 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java @@ -21,8 +21,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.options.CatalogOptions; -import org.apache.paimon.options.Options; /** Factory to create {@link RESTCatalog}. */ public class RESTCatalogFactory implements CatalogFactory { @@ -35,10 +33,6 @@ public String identifier() { @Override public Catalog create(CatalogContext context) { - Options options = context.options(); - if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { - throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); - } return new RESTCatalog(context); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java b/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java index 5abe48049a36e..1c3e419f13d21 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** Schema changes to serialize List of SchemaChange . */ @JsonIgnoreProperties(ignoreUnknown = true) @@ -201,4 +202,37 @@ public List getUpdateColumnComments() { public List getUpdateColumnPositions() { return updateColumnPositions; } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaChanges that = (SchemaChanges) o; + return Objects.equals(setOptions, that.setOptions) + && Objects.equals(removeOptions, that.removeOptions) + && Objects.equals(comment, that.comment) + && Objects.equals(addColumns, that.addColumns) + && Objects.equals(renameColumns, that.renameColumns) + && Objects.equals(dropColumns, that.dropColumns) + && Objects.equals(updateColumnTypes, that.updateColumnTypes) + && Objects.equals(updateColumnNullabilities, that.updateColumnNullabilities) + && Objects.equals(updateColumnComments, that.updateColumnComments) + && Objects.equals(updateColumnPositions, that.updateColumnPositions); + } + + @Override + public int hashCode() { + return Objects.hash( + setOptions, + removeOptions, + comment, + addColumns, + renameColumns, + dropColumns, + updateColumnTypes, + updateColumnNullabilities, + updateColumnComments, + updateColumnPositions); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index de6842908b73e..ec8f09e847347 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -232,14 +232,14 @@ final class AddColumn implements SchemaChange { private static final long serialVersionUID = 1L; private static final String FIELD_FILED_NAMES = "field-names"; - private static final String FIELD_DATA_TYPES = "data-types"; + private static final String FIELD_DATA_TYPE = "data-type"; private static final String FIELD_COMMENT = "comment"; private static final String FIELD_MOVE = "move"; @JsonProperty(FIELD_FILED_NAMES) private final String[] fieldNames; - @JsonProperty(FIELD_DATA_TYPES) + @JsonProperty(FIELD_DATA_TYPE) private final DataType dataType; @JsonProperty(FIELD_COMMENT) @@ -251,7 +251,7 @@ final class AddColumn implements SchemaChange { @JsonCreator private AddColumn( @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames, - @JsonProperty(FIELD_DATA_TYPES) DataType dataType, + @JsonProperty(FIELD_DATA_TYPE) DataType dataType, @JsonProperty(FIELD_COMMENT) String description, @JsonProperty(FIELD_MOVE) Move move) { this.fieldNames = fieldNames; @@ -283,7 +283,7 @@ public String[] getFieldNames() { return fieldNames; } - @JsonGetter(FIELD_DATA_TYPES) + @JsonGetter(FIELD_DATA_TYPE) public DataType getDataType() { return dataType; } @@ -312,7 +312,7 @@ public boolean equals(Object o) { return Arrays.equals(fieldNames, addColumn.fieldNames) && dataType.equals(addColumn.dataType) && Objects.equals(description, addColumn.description) - && move.equals(addColumn.move); + && Objects.equals(move, addColumn.move); } @Override @@ -435,7 +435,7 @@ final class UpdateColumnType implements SchemaChange { private static final long serialVersionUID = 1L; private static final String FIELD_FILED_NAMES = "field-names"; - private static final String FIELD_NEW_DATA_TYPE = "new-data-types"; + private static final String FIELD_NEW_DATA_TYPE = "new-data-type"; private static final String FIELD_KEEP_NULLABILITY = "keep-nullability"; @JsonProperty(FIELD_FILED_NAMES)