Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce Object Table to manage unstructured files #4459

Merged
merged 6 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion docs/content/concepts/table-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ Paimon supports table types:
3. view: metastore required, views in SQL are a kind of virtual table
4. format-table: file format table refers to a directory that contains multiple files of the same format, where
operations on this table allow for reading or writing to these files, compatible with Hive tables
5. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
5. object table: provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
6. materialized-table: aimed at simplifying both batch and stream data pipelines, providing a consistent development
experience, see [Flink Materialized Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)

## Table with PK
Expand Down Expand Up @@ -169,6 +170,71 @@ CREATE TABLE my_parquet_table (

{{< /tabs >}}

## Object Table

Object Table provides metadata indexes for unstructured data objects in the specified Object Storage storage directory.
Object tables allow users to analyze unstructured data in Object Storage:

1. Use Python API to manipulate these unstructured data, such as converting images to PDF format.
2. Model functions can also be used to perform inference, and then the results of these operations can be concatenated
with other structured data in the Catalog.

The object table is managed by Catalog and can also have access permissions and the ability to manage blood relations.

{{< tabs "object-table" >}}

{{< tab "Flink-SQL" >}}

```sql
-- Create Object Table

CREATE TABLE `my_object_table` WITH (
'type' = 'object-table',
'object-location' = 'oss://my_bucket/my_location'
);

-- Refresh Object Table

CALL sys.refresh_object_table('mydb.my_object_table');

-- Query Object Table

SELECT * FROM `my_object_table`;

-- Query Object Table with Time Travel

SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */;
```

{{< /tab >}}

{{< tab "Spark-SQL" >}}

```sql
-- Create Object Table

CREATE TABLE `my_object_table` TBLPROPERTIES (
'type' = 'object-table',
'object-location' = 'oss://my_bucket/my_location'
);

-- Refresh Object Table

CALL sys.refresh_object_table('mydb.my_object_table');

-- Query Object Table

SELECT * FROM `my_object_table`;

-- Query Object Table with Time Travel

SELECT * FROM `my_object_table` VERSION AS OF 1;
```

{{< /tab >}}

{{< /tabs >}}

## Materialized Table

Materialized Table aimed at simplifying both batch and stream data pipelines, providing a consistent development
Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,12 @@
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
<tr>
<td><h5>object-location</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The object location for object table.</td>
</tr>
<tr>
<td><h5>page-size</h5></td>
<td style="word-wrap: break-word;">64 kb</td>
Expand Down Expand Up @@ -904,7 +910,7 @@
<td><h5>type</h5></td>
<td style="word-wrap: break-word;">table</td>
<td><p>Enum</p></td>
<td>Type of the table.<br /><br />Possible values:<ul><li>"table": Normal Paimon table.</li><li>"format-table": A file format table refers to a directory that contains multiple files of the same format.</li><li>"materialized-table": A materialized table.</li></ul></td>
<td>Type of the table.<br /><br />Possible values:<ul><li>"table": Normal Paimon table.</li><li>"format-table": A file format table refers to a directory that contains multiple files of the same format.</li><li>"materialized-table": A materialized table combines normal Paimon table and materialized SQL.</li><li>"object-table": A object table combines normal Paimon table and object location.</li></ul></td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
Expand Down
15 changes: 15 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to enable asynchronous IO writing when writing files.");

public static final ConfigOption<String> OBJECT_LOCATION =
key("object-location")
.stringType()
.noDefaultValue()
.withDescription("The object location for object table.");

@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption<String> MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
Expand Down Expand Up @@ -1516,6 +1522,10 @@ public static Path path(Options options) {
return new Path(options.get(PATH));
}

public TableType type() {
return options.get(TYPE);
}

public String formatType() {
return normalizeFileFormat(options.get(FILE_FORMAT));
}
Expand Down Expand Up @@ -1565,6 +1575,11 @@ public static FileFormat createFileFormat(Options options, ConfigOption<String>
return FileFormat.fromIdentifier(formatIdentifier, options);
}

public String objectLocation() {
checkArgument(type() == TableType.OBJECT_TABLE, "Only object table has object location!");
return options.get(OBJECT_LOCATION);
}

public Map<Integer, String> fileCompressionPerLevel() {
Map<String, String> levelCompressions = options.get(FILE_COMPRESSION_PER_LEVEL);
return levelCompressions.entrySet().stream()
Expand Down
7 changes: 6 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/TableType.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public enum TableType implements DescribedEnum {
FORMAT_TABLE(
"format-table",
"A file format table refers to a directory that contains multiple files of the same format."),
MATERIALIZED_TABLE("materialized-table", "A materialized table.");
MATERIALIZED_TABLE(
"materialized-table",
"A materialized table combines normal Paimon table and materialized SQL."),
OBJECT_TABLE(
"object-table", "A object table combines normal Paimon table and object location.");

private final String value;
private final String description;

Expand Down
22 changes: 22 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.annotation.Public;

import javax.annotation.Nullable;

/**
* Interface that represents the client side information for a file independent of the file system.
*
Expand Down Expand Up @@ -56,4 +58,24 @@ public interface FileStatus {
* milliseconds since the epoch (UTC January 1, 1970).
*/
long getModificationTime();

/**
* Get the last access time of the file.
*
* @return A long value representing the time the file was last accessed, measured in
* milliseconds since the epoch (UTC January 1, 1970).
*/
default long getAccessTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getLatestAccessTime may better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same to getModificationTime.

return 0;
}

/**
* Returns the owner of this file.
*
* @return the owner of this file
*/
@Nullable
default String getOwner() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,16 @@ public Path getPath() {
public long getModificationTime() {
return status.getModificationTime();
}

@Override
public long getAccessTime() {
return status.getAccessTime();
}

@Override
public String getOwner() {
return status.getOwner();
}
}

// ============================== extra methods ===================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public static LocalZonedTimestampType TIMESTAMP_WITH_LOCAL_TIME_ZONE(int precisi
return new LocalZonedTimestampType(precision);
}

public static LocalZonedTimestampType TIMESTAMP_LTZ_MILLIS() {
return new LocalZonedTimestampType(3);
}

public static DecimalType DECIMAL(int precision, int scale) {
return new DecimalType(precision, scale);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,16 @@ public int defaultSize() {
}

@Override
public DataType copy(boolean isNullable) {
public RowType copy(boolean isNullable) {
return new RowType(
isNullable, fields.stream().map(DataField::copy).collect(Collectors.toList()));
}

@Override
public RowType notNull() {
return copy(false);
}

@Override
public String asSQLString() {
return withNullability(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;
Expand All @@ -48,6 +50,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -56,7 +59,6 @@

import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
Expand Down Expand Up @@ -284,13 +286,35 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx

copyTableDefaultOptions(schema.options());

if (Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE) {
createFormatTable(identifier, schema);
} else {
createTableImpl(identifier, schema);
switch (Options.fromMap(schema.options()).get(TYPE)) {
case TABLE:
case MATERIALIZED_TABLE:
createTableImpl(identifier, schema);
break;
case OBJECT_TABLE:
createObjectTable(identifier, schema);
break;
case FORMAT_TABLE:
createFormatTable(identifier, schema);
break;
}
}

private void createObjectTable(Identifier identifier, Schema schema) {
RowType rowType = schema.rowType();
checkArgument(
rowType.getFields().isEmpty()
|| new HashSet<>(ObjectTable.SCHEMA.getFields())
.containsAll(rowType.getFields()),
"Schema of Object Table can be empty or %s, but is %s.",
ObjectTable.SCHEMA,
rowType);
checkArgument(
schema.options().containsKey(CoreOptions.OBJECT_LOCATION.key()),
"Object table should have object-location option.");
createTableImpl(identifier, schema.copy(ObjectTable.SCHEMA));
}

protected abstract void createTableImpl(Identifier identifier, Schema schema);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public String comment() {
return comment;
}

public Schema copy(RowType rowType) {
return new Schema(rowType.getFields(), partitionKeys, primaryKeys, options, comment);
}

private static List<DataField> normalizeFields(
List<DataField> fields, List<String> primaryKeys, List<String> partitionKeys) {
List<String> fieldNames = fields.stream().map(DataField::name).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.StringUtils;

import java.io.IOException;
Expand All @@ -33,6 +35,7 @@

import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
Expand Down Expand Up @@ -124,6 +127,17 @@ public static FileStoreTable createWithoutFallbackBranch(
fileIO, tablePath, tableSchema, catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema, catalogEnvironment);
return table.copy(dynamicOptions.toMap());
table = table.copy(dynamicOptions.toMap());
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.build();
}
return table;
}
}
Loading
Loading