diff --git a/docs/content/concepts/table-types.md b/docs/content/concepts/table-types.md index 49e4479629a3..0a1ef6481170 100644 --- a/docs/content/concepts/table-types.md +++ b/docs/content/concepts/table-types.md @@ -33,7 +33,8 @@ Paimon supports table types: 3. view: metastore required, views in SQL are a kind of virtual table 4. format-table: file format table refers to a directory that contains multiple files of the same format, where operations on this table allow for reading or writing to these files, compatible with Hive tables -5. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development +5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage storage directory. +6. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/) ## Table with PK @@ -169,6 +170,71 @@ CREATE TABLE my_parquet_table ( {{< /tabs >}} +## Object Table + +Object Table provides metadata indexes for unstructured data objects in the specified Object Storage storage directory. +Object tables allow users to analyze unstructured data in Object Storage: + +1. Use Python API to manipulate these unstructured data, such as converting images to PDF format. +2. Model functions can also be used to perform inference, and then the results of these operations can be concatenated + with other structured data in the Catalog. + +The object table is managed by Catalog and can also have access permissions and the ability to manage blood relations. + +{{< tabs "object-table" >}} + +{{< tab "Flink-SQL" >}} + +```sql +-- Create Object Table + +CREATE TABLE `my_object_table` WITH ( + 'type' = 'object-table', + 'object-location' = 'oss://my_bucket/my_location' +); + +-- Refresh Object Table + +CALL sys.refresh_object_table('mydb.my_object_table'); + +-- Query Object Table + +SELECT * FROM `my_object_table`; + +-- Query Object Table with Time Travel + +SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */; +``` + +{{< /tab >}} + +{{< tab "Spark-SQL" >}} + +```sql +-- Create Object Table + +CREATE TABLE `my_object_table` TBLPROPERTIES ( + 'type' = 'object-table', + 'object-location' = 'oss://my_bucket/my_location' +); + +-- Refresh Object Table + +CALL sys.refresh_object_table('mydb.my_object_table'); + +-- Query Object Table + +SELECT * FROM `my_object_table`; + +-- Query Object Table with Time Travel + +SELECT * FROM `my_object_table` VERSION AS OF 1; +``` + +{{< /tab >}} + +{{< /tabs >}} + ## Materialized Table Materialized Table aimed at simplifying both batch and stream data pipelines, providing a consistent development diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6556867ac03e..050a1db500ad 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -527,6 +527,12 @@ Integer The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3. + +
object-location
+ (none) + String + The object location for object table. +
page-size
64 kb @@ -904,7 +910,7 @@
type
table

Enum

- Type of the table.

Possible values: + Type of the table.

Possible values:
write-buffer-for-append
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index c69f0aae098e..338182091bbe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1405,6 +1405,12 @@ public class CoreOptions implements Serializable { .withDescription( "Whether to enable asynchronous IO writing when writing files."); + public static final ConfigOption OBJECT_LOCATION = + key("object-location") + .stringType() + .noDefaultValue() + .withDescription("The object location for object table."); + @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_DEFINITION_QUERY = key("materialized-table.definition-query") @@ -1516,6 +1522,10 @@ public static Path path(Options options) { return new Path(options.get(PATH)); } + public TableType type() { + return options.get(TYPE); + } + public String formatType() { return normalizeFileFormat(options.get(FILE_FORMAT)); } @@ -1565,6 +1575,11 @@ public static FileFormat createFileFormat(Options options, ConfigOption return FileFormat.fromIdentifier(formatIdentifier, options); } + public String objectLocation() { + checkArgument(type() == TableType.OBJECT_TABLE, "Only object table has object location!"); + return options.get(OBJECT_LOCATION); + } + public Map fileCompressionPerLevel() { Map levelCompressions = options.get(FILE_COMPRESSION_PER_LEVEL); return levelCompressions.entrySet().stream() diff --git a/paimon-common/src/main/java/org/apache/paimon/TableType.java b/paimon-common/src/main/java/org/apache/paimon/TableType.java index d690d5db3700..d9ac020f793c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/TableType.java +++ b/paimon-common/src/main/java/org/apache/paimon/TableType.java @@ -29,7 +29,12 @@ public enum TableType implements DescribedEnum { FORMAT_TABLE( "format-table", "A file format table refers to a directory that contains multiple files of the same format."), - MATERIALIZED_TABLE("materialized-table", "A materialized table."); + MATERIALIZED_TABLE( + "materialized-table", + "A materialized table combines normal Paimon table and materialized SQL."), + OBJECT_TABLE( + "object-table", "A object table combines normal Paimon table and object location."); + private final String value; private final String description; diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java index 8308f5205d66..c3e6cde9cf0b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java @@ -20,6 +20,8 @@ import org.apache.paimon.annotation.Public; +import javax.annotation.Nullable; + /** * Interface that represents the client side information for a file independent of the file system. * @@ -56,4 +58,24 @@ public interface FileStatus { * milliseconds since the epoch (UTC January 1, 1970). */ long getModificationTime(); + + /** + * Get the last access time of the file. + * + * @return A long value representing the time the file was last accessed, measured in + * milliseconds since the epoch (UTC January 1, 1970). + */ + default long getAccessTime() { + return 0; + } + + /** + * Returns the owner of this file. + * + * @return the owner of this file + */ + @Nullable + default String getOwner() { + return null; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index 70325ee69635..0a8d64a73b00 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -329,6 +329,16 @@ public Path getPath() { public long getModificationTime() { return status.getModificationTime(); } + + @Override + public long getAccessTime() { + return status.getAccessTime(); + } + + @Override + public String getOwner() { + return status.getOwner(); + } } // ============================== extra methods =================================== diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java index 659212b064eb..b025b6a838e0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java @@ -103,6 +103,10 @@ public static LocalZonedTimestampType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precisi return new LocalZonedTimestampType(precision); } + public static LocalZonedTimestampType TIMESTAMP_LTZ_MILLIS() { + return new LocalZonedTimestampType(3); + } + public static DecimalType DECIMAL(int precision, int scale) { return new DecimalType(precision, scale); } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index 80e41be710e2..f3fce0db6df1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -169,11 +169,16 @@ public int defaultSize() { } @Override - public DataType copy(boolean isNullable) { + public RowType copy(boolean isNullable) { return new RowType( isNullable, fields.stream().map(DataField::copy).collect(Collectors.toList())); } + @Override + public RowType notNull() { + return copy(false); + } + @Override public String asSQLString() { return withNullability( diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4c36ad4db3c9..c2e4afe5d533 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -38,8 +38,10 @@ import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; import javax.annotation.Nullable; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -56,7 +59,6 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; @@ -284,13 +286,35 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx copyTableDefaultOptions(schema.options()); - if (Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE) { - createFormatTable(identifier, schema); - } else { - createTableImpl(identifier, schema); + switch (Options.fromMap(schema.options()).get(TYPE)) { + case TABLE: + case MATERIALIZED_TABLE: + createTableImpl(identifier, schema); + break; + case OBJECT_TABLE: + createObjectTable(identifier, schema); + break; + case FORMAT_TABLE: + createFormatTable(identifier, schema); + break; } } + private void createObjectTable(Identifier identifier, Schema schema) { + RowType rowType = schema.rowType(); + checkArgument( + rowType.getFields().isEmpty() + || new HashSet<>(ObjectTable.SCHEMA.getFields()) + .containsAll(rowType.getFields()), + "Schema of Object Table can be empty or %s, but is %s.", + ObjectTable.SCHEMA, + rowType); + checkArgument( + schema.options().containsKey(CoreOptions.OBJECT_LOCATION.key()), + "Object table should have object-location option."); + createTableImpl(identifier, schema.copy(ObjectTable.SCHEMA)); + } + protected abstract void createTableImpl(Identifier identifier, Schema schema); @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index c6c79f4d4afd..9984e3feef0c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -96,6 +96,10 @@ public String comment() { return comment; } + public Schema copy(RowType rowType) { + return new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, comment); + } + private static List normalizeFields( List fields, List primaryKeys, List partitionKeys) { List fieldNames = fields.stream().map(DataField::name).collect(Collectors.toList()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 423dc1726319..47d8777241d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -19,12 +19,14 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.utils.StringUtils; import java.io.IOException; @@ -33,6 +35,7 @@ import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { @@ -124,6 +127,17 @@ public static FileStoreTable createWithoutFallbackBranch( fileIO, tablePath, tableSchema, catalogEnvironment) : new PrimaryKeyFileStoreTable( fileIO, tablePath, tableSchema, catalogEnvironment); - return table.copy(dynamicOptions.toMap()); + table = table.copy(dynamicOptions.toMap()); + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .build(); + } + return table; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java new file mode 100644 index 000000000000..326efbc0eac8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.object; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Util class for refreshing object table. */ +public class ObjectRefresh { + + public static long refresh(ObjectTable table) throws Exception { + String location = table.objectLocation(); + FileStoreTable underlyingTable = table.underlyingTable(); + FileIO fileIO = underlyingTable.fileIO(); + + List fileCollector = new ArrayList<>(); + listAllFiles(fileIO, new Path(location), fileCollector); + + BatchWriteBuilder writeBuilder = underlyingTable.newBatchWriteBuilder().withOverwrite(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (FileStatus file : fileCollector) { + write.write(toRow(file)); + } + commit.commit(write.prepareCommit()); + } + + return fileCollector.size(); + } + + private static void listAllFiles(FileIO fileIO, Path directory, List fileCollector) + throws IOException { + FileStatus[] files = fileIO.listStatus(directory); + if (files == null) { + return; + } + + for (FileStatus file : files) { + if (file.isDir()) { + listAllFiles(fileIO, file.getPath(), fileCollector); + } else { + fileCollector.add(file); + } + } + } + + private static InternalRow toRow(FileStatus file) { + return toRow( + file.getPath().toString(), + file.getPath().getName(), + file.getLen(), + Timestamp.fromEpochMillis(file.getModificationTime()), + Timestamp.fromEpochMillis(file.getAccessTime()), + file.getOwner(), + null, + null, + null, + null, + null, + new GenericMap(Collections.emptyMap())); + } + + public static GenericRow toRow(Object... values) { + GenericRow row = new GenericRow(values.length); + + for (int i = 0; i < values.length; ++i) { + Object value = values[i]; + if (value instanceof String) { + value = BinaryString.fromString((String) value); + } + row.setField(i, value); + } + + return row; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java new file mode 100644 index 000000000000..65689108caae --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.object; + +import org.apache.paimon.manifest.ManifestCacheFilter; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.DelegatedFileStoreTable; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import java.util.HashSet; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * A object table refers to a directory that contains multiple objects (files), Object table + * provides metadata indexes for unstructured data objects in this directory. Allowing users to + * analyze unstructured data in Object Storage. + * + *

Object Table stores the metadata of objects in the underlying table. + */ +public interface ObjectTable extends FileStoreTable { + + RowType SCHEMA = + RowType.builder() + .field("path", DataTypes.STRING().notNull()) + .field("name", DataTypes.STRING().notNull()) + .field("length", DataTypes.BIGINT().notNull()) + .field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS()) + .field("atime", DataTypes.TIMESTAMP_LTZ_MILLIS()) + .field("owner", DataTypes.STRING().nullable()) + .field("generation", DataTypes.INT().nullable()) + .field("content_type", DataTypes.STRING().nullable()) + .field("storage_class", DataTypes.STRING().nullable()) + .field("md5_hash", DataTypes.STRING().nullable()) + .field("metadata_mtime", DataTypes.TIMESTAMP_LTZ_MILLIS().nullable()) + .field("metadata", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) + .build() + .notNull(); + + /** Object location in file system. */ + String objectLocation(); + + /** Underlying table to store metadata. */ + FileStoreTable underlyingTable(); + + long refresh(); + + @Override + ObjectTable copy(Map dynamicOptions); + + /** Create a new builder for {@link ObjectTable}. */ + static ObjectTable.Builder builder() { + return new ObjectTable.Builder(); + } + + /** Builder for {@link ObjectTable}. */ + class Builder { + + private FileStoreTable underlyingTable; + private String objectLocation; + + public ObjectTable.Builder underlyingTable(FileStoreTable underlyingTable) { + this.underlyingTable = underlyingTable; + checkArgument( + new HashSet<>(SCHEMA.getFields()) + .containsAll(underlyingTable.rowType().getFields()), + "Schema of Object Table should be %s, but is %s.", + SCHEMA, + underlyingTable.rowType()); + return this; + } + + public ObjectTable.Builder objectLocation(String objectLocation) { + this.objectLocation = objectLocation; + return this; + } + + public ObjectTable build() { + return new ObjectTableImpl(underlyingTable, objectLocation); + } + } + + /** An implementation for {@link ObjectTable}. */ + class ObjectTableImpl extends DelegatedFileStoreTable implements ObjectTable { + + private final String objectLocation; + + public ObjectTableImpl(FileStoreTable underlyingTable, String objectLocation) { + super(underlyingTable); + this.objectLocation = objectLocation; + } + + @Override + public BatchWriteBuilder newBatchWriteBuilder() { + throw new UnsupportedOperationException("Object table does not support Write."); + } + + @Override + public StreamWriteBuilder newStreamWriteBuilder() { + throw new UnsupportedOperationException("Object table does not support Write."); + } + + @Override + public TableWriteImpl newWrite(String commitUser) { + throw new UnsupportedOperationException("Object table does not support Write."); + } + + @Override + public TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + throw new UnsupportedOperationException("Object table does not support Write."); + } + + @Override + public TableCommitImpl newCommit(String commitUser) { + throw new UnsupportedOperationException("Object table does not support Commit."); + } + + @Override + public String objectLocation() { + return objectLocation; + } + + @Override + public FileStoreTable underlyingTable() { + return wrapped; + } + + @Override + public long refresh() { + try { + return ObjectRefresh.refresh(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public ObjectTable copy(Map dynamicOptions) { + return new ObjectTableImpl(wrapped.copy(dynamicOptions), objectLocation); + } + + @Override + public FileStoreTable copy(TableSchema newTableSchema) { + return new ObjectTableImpl(wrapped.copy(newTableSchema), objectLocation); + } + + @Override + public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new ObjectTableImpl( + wrapped.copyWithoutTimeTravel(dynamicOptions), objectLocation); + } + + @Override + public FileStoreTable copyWithLatestSchema() { + return new ObjectTableImpl(wrapped.copyWithLatestSchema(), objectLocation); + } + + @Override + public FileStoreTable switchToBranch(String branchName) { + return new ObjectTableImpl(wrapped.switchToBranch(branchName), objectLocation); + } + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java index 4d86c12a6e52..67027eabadfb 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java @@ -286,5 +286,15 @@ public Path getPath() { public long getModificationTime() { return status.getModificationTime(); } + + @Override + public long getAccessTime() { + return status.getAccessTime(); + } + + @Override + public String getOwner() { + return status.getOwner(); + } } } diff --git a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java index abfe0fabba61..80f3df582096 100644 --- a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java +++ b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java @@ -286,5 +286,15 @@ public Path getPath() { public long getModificationTime() { return status.getModificationTime(); } + + @Override + public long getAccessTime() { + return status.getAccessTime(); + } + + @Override + public String getOwner() { + return status.getOwner(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java index db66379f3108..74512409bfc8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java @@ -227,5 +227,10 @@ public Path getPath() { public long getModificationTime() { return status.getModificationTime(); } + + @Override + public long getAccessTime() { + return status.getAccessTime(); + } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java new file mode 100644 index 000000000000..97eb3095f094 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.table.object.ObjectTable; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.types.Row; + +/** + * Refresh Object Table procedure. Usage: + * + *


+ *  CALL sys.refresh_object_table('tableId')
+ * 
+ */ +public class RefreshObjectTableProcedure extends ProcedureBase { + + private static final String IDENTIFIER = "refresh_object_table"; + + @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) + public @DataTypeHint("ROW") Row[] call( + ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + ObjectTable table = (ObjectTable) table(tableId); + long fileNumber = table.refresh(); + return new Row[] {Row.of(fileNumber)}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6fe5e74ebe0d..0ff3ac1f1e1c 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -79,3 +79,4 @@ org.apache.paimon.flink.procedure.FastForwardProcedure org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure +org.apache.paimon.flink.procedure.RefreshObjectTableProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java new file mode 100644 index 000000000000..b9e30035b093 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for object table. */ +public class ObjectTableITCase extends CatalogITCaseBase { + + @Test + public void testIllegalObjectTable() { + assertThatThrownBy( + () -> + sql( + "CREATE TABLE T (a INT, b INT, c INT) WITH ('type' = 'object-table')")) + .rootCause() + .hasMessageContaining("Schema of Object Table can be empty or"); + assertThatThrownBy(() -> sql("CREATE TABLE T WITH ('type' = 'object-table')")) + .rootCause() + .hasMessageContaining("Object table should have object-location option."); + } + + @Test + public void testObjectTableRefresh() throws IOException { + Path objectLocation = new Path(path + "/object-location"); + FileIO fileIO = LocalFileIO.create(); + sql( + "CREATE TABLE T WITH ('type' = 'object-table', 'object-location' = '%s')", + objectLocation); + + // add new file + fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3"); + sql("CALL sys.refresh_object_table('default.T')"); + assertThat(sql("SELECT name, length FROM T")).containsExactlyInAnyOrder(Row.of("f0", 5L)); + + // add new file + fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6"); + sql("CALL sys.refresh_object_table('default.T')"); + assertThat(sql("SELECT name, length FROM T")) + .containsExactlyInAnyOrder(Row.of("f0", 5L), Row.of("f1", 5L)); + + // delete file + fileIO.deleteQuietly(new Path(objectLocation, "f0")); + sql("CALL sys.refresh_object_table('default.T')"); + assertThat(sql("SELECT name, length FROM T")).containsExactlyInAnyOrder(Row.of("f1", 5L)); + + // time travel + assertThat(sql("SELECT name, length FROM T /*+ OPTIONS('scan.snapshot-id' = '1') */")) + .containsExactlyInAnyOrder(Row.of("f0", 5L)); + + // insert into + assertThatThrownBy(() -> sql("INSERT INTO T SELECT * FROM T")) + .rootCause() + .hasMessageContaining("Object table does not support Write."); + assertThat(sql("SELECT name, length FROM T")).containsExactlyInAnyOrder(Row.of("f1", 5L)); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index c93aad41a732..35b65a7b530b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateTableProcedure; import org.apache.paimon.spark.procedure.Procedure; import org.apache.paimon.spark.procedure.ProcedureBuilder; +import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure; import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RenameTagProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; @@ -87,6 +88,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder); procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder); procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder); + procedureBuilders.put("refresh_object_table", RefreshObjectTableProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java new file mode 100644 index 000000000000..c6b6fdab4723 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.table.object.ObjectTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** Spark procedure to refresh Object Table. */ +public class RefreshObjectTableProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("file_number", DataTypes.LongType, false, Metadata.empty()) + }); + + protected RefreshObjectTableProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + return modifyPaimonTable( + tableIdent, + table -> { + ObjectTable objectTable = (ObjectTable) table; + long fileNumber = objectTable.refresh(); + InternalRow outputRow = newInternalRow(fileNumber); + return new InternalRow[] {outputRow}; + }); + } + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + public RefreshObjectTableProcedure doBuild() { + return new RefreshObjectTableProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RefreshObjectTableProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala new file mode 100644 index 000000000000..3a446e33d8c8 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.fs.Path +import org.apache.paimon.fs.local.LocalFileIO +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class ObjectTableTest extends PaimonSparkTestBase { + + test(s"Paimon object table") { + val objectLocation = new Path(tempDBDir + "/object-location") + val fileIO = LocalFileIO.create + + spark.sql(s""" + |CREATE TABLE T TBLPROPERTIES ( + | 'type' = 'object-table', + | 'object-location' = '$objectLocation' + |) + |""".stripMargin) + + // add new file + fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3") + spark.sql("CALL sys.refresh_object_table('test.T')") + checkAnswer( + spark.sql("SELECT name, length FROM T"), + Row("f0", 5L) :: Nil + ) + + // add new file + fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6") + spark.sql("CALL sys.refresh_object_table('test.T')") + checkAnswer( + spark.sql("SELECT name, length FROM T"), + Row("f0", 5L) :: Row("f1", 5L) :: Nil + ) + + // time travel + checkAnswer( + spark.sql("SELECT name, length FROM T VERSION AS OF 1"), + Row("f0", 5L) :: Nil + ) + } +}