From c5a950e3fd214949f329d836d5d4828d810ad434 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 26 Aug 2024 15:21:43 +0800 Subject: [PATCH] [core] Introduce Format table to support csv,orc,parquet tables (#4053) --- .../generated/hive_catalog_configuration.html | 6 + .../paimon/catalog/AbstractCatalog.java | 18 +- .../org/apache/paimon/table/FormatTable.java | 317 ++++++++++++++++++ .../paimon/table/FormatTableOptions.java | 32 ++ .../org/apache/paimon/flink/FlinkCatalog.java | 10 + .../paimon/flink/FlinkTableFactory.java | 9 + .../paimon/flink/FormatCatalogTable.java | 144 ++++++++ .../org/apache/paimon/hive/HiveCatalog.java | 59 +++- .../paimon/hive/HiveCatalogOptions.java | 9 + .../paimon/hive/HiveFormatTableUtils.java | 118 +++++++ paimon-hive/paimon-hive-connector-2.3/pom.xml | 7 + .../hive/Hive23CatalogFormatTableITCase.java | 52 +++ paimon-hive/paimon-hive-connector-3.1/pom.xml | 7 + .../hive/Hive31CatalogFormatTableITCase.java | 45 +++ .../HiveCatalogFormatTableITCaseBase.java | 165 +++++++++ 15 files changed, 993 insertions(+), 5 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java create mode 100644 paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java create mode 100644 paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java create mode 100644 paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java diff --git a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html index e0257d301b6f..076a46232c3c 100644 --- a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html @@ -39,6 +39,12 @@ String Specify client cache key, multiple elements separated by commas.
+ +
format-table.enabled
+ false + Boolean + Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. +
hadoop-conf-dir
(none) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 365887113d1e..dab9508282fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; @@ -336,7 +337,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } return table; } else { - return getDataTable(identifier); + try { + return getDataTable(identifier); + } catch (TableNotExistException e) { + return getFormatTable(identifier); + } } } @@ -355,6 +360,17 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE lineageMetaFactory)); } + /** + * Return a {@link FormatTable} identified by the given {@link Identifier}. + * + * @param identifier Path of the table + * @return The requested table + * @throws Catalog.TableNotExistException if the target does not exist + */ + public FormatTable getFormatTable(Identifier identifier) throws Catalog.TableNotExistException { + throw new Catalog.TableNotExistException(identifier); + } + /** * Get warehouse path for specified database. If a catalog would like to provide individual path * for each database, this method can be `Override` in that catalog. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java new file mode 100644 index 000000000000..2ec38a70d247 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.Public; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.stats.Statistics; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SimpleFileReader; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * A file format table refers to a directory that contains multiple files of the same format, where + * operations on this table allow for reading or writing to these files, facilitating the retrieval + * of existing data and the addition of new files. + * + *

Partitioned file format table just like the standard hive format. Partitions are discovered + * and inferred based on directory structure. + * + * @since 0.9.0 + */ +@Public +public interface FormatTable extends Table { + + /** Directory location in file system. */ + String location(); + + /** Format of this table. */ + Format format(); + + @Override + FormatTable copy(Map dynamicOptions); + + /** Currently supported formats. */ + enum Format { + ORC, + PARQUET, + CSV + } + + /** Create a new builder for {@link FormatTable}. */ + static FormatTable.Builder builder() { + return new FormatTable.Builder(); + } + + /** Builder for {@link FormatTable}. */ + class Builder { + + private Identifier identifier; + private RowType rowType; + private List partitionKeys; + private String location; + private FormatTable.Format format; + private Map options; + @Nullable private String comment; + + public FormatTable.Builder identifier(Identifier identifier) { + this.identifier = identifier; + return this; + } + + public FormatTable.Builder rowType(RowType rowType) { + this.rowType = rowType; + return this; + } + + public FormatTable.Builder partitionKeys(List partitionKeys) { + this.partitionKeys = partitionKeys; + return this; + } + + public FormatTable.Builder location(String location) { + this.location = location; + return this; + } + + public FormatTable.Builder format(FormatTable.Format format) { + this.format = format; + return this; + } + + public FormatTable.Builder options(Map options) { + this.options = options; + return this; + } + + public FormatTable.Builder comment(@Nullable String comment) { + this.comment = comment; + return this; + } + + public FormatTable build() { + return new FormatTable.FormatTableImpl( + identifier, rowType, partitionKeys, location, format, options, comment); + } + } + + /** An implementation for {@link FormatTable}. */ + class FormatTableImpl implements FormatTable { + + private final Identifier identifier; + private final RowType rowType; + private final List partitionKeys; + private final String location; + private final Format format; + private final Map options; + @Nullable private final String comment; + + public FormatTableImpl( + Identifier identifier, + RowType rowType, + List partitionKeys, + String location, + Format format, + Map options, + @Nullable String comment) { + this.identifier = identifier; + this.rowType = rowType; + this.partitionKeys = partitionKeys; + this.location = location; + this.format = format; + this.options = options; + this.comment = comment; + } + + @Override + public String name() { + return identifier.getTableName(); + } + + @Override + public String fullName() { + return identifier.getFullName(); + } + + @Override + public RowType rowType() { + return rowType; + } + + @Override + public List partitionKeys() { + return partitionKeys; + } + + @Override + public List primaryKeys() { + return Collections.emptyList(); + } + + @Override + public String location() { + return location; + } + + @Override + public Format format() { + return format; + } + + @Override + public Map options() { + return options; + } + + @Override + public Optional comment() { + return Optional.ofNullable(comment); + } + + @Override + public FormatTable copy(Map dynamicOptions) { + Map newOptions = new HashMap<>(options); + newOptions.putAll(dynamicOptions); + return new FormatTableImpl( + identifier, rowType, partitionKeys, location, format, newOptions, comment); + } + } + + // ===================== Unsupported =============================== + + @Override + default Optional statistics() { + return Optional.empty(); + } + + @Override + default OptionalLong latestSnapshotId() { + throw new UnsupportedOperationException(); + } + + @Override + default Snapshot snapshot(long snapshotId) { + throw new UnsupportedOperationException(); + } + + @Override + default SimpleFileReader manifestListReader() { + throw new UnsupportedOperationException(); + } + + @Override + default SimpleFileReader manifestFileReader() { + throw new UnsupportedOperationException(); + } + + @Override + default void rollbackTo(long snapshotId) { + throw new UnsupportedOperationException(); + } + + @Override + default void createTag(String tagName, long fromSnapshotId) { + throw new UnsupportedOperationException(); + } + + @Override + default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + + @Override + default void createTag(String tagName) { + throw new UnsupportedOperationException(); + } + + @Override + default void createTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException(); + } + + @Override + default void deleteTag(String tagName) { + throw new UnsupportedOperationException(); + } + + @Override + default void rollbackTo(String tagName) { + throw new UnsupportedOperationException(); + } + + @Override + default void createBranch(String branchName) { + throw new UnsupportedOperationException(); + } + + @Override + default void createBranch(String branchName, String tagName) { + throw new UnsupportedOperationException(); + } + + @Override + default void deleteBranch(String branchName) { + throw new UnsupportedOperationException(); + } + + @Override + default void fastForward(String branchName) { + throw new UnsupportedOperationException(); + } + + @Override + default ExpireSnapshots newExpireSnapshots() { + throw new UnsupportedOperationException(); + } + + @Override + default ExpireSnapshots newExpireChangelog() { + throw new UnsupportedOperationException(); + } + + @Override + default ReadBuilder newReadBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + default BatchWriteBuilder newBatchWriteBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + default StreamWriteBuilder newStreamWriteBuilder() { + throw new UnsupportedOperationException(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java new file mode 100644 index 000000000000..64f134d07588 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; + +/** Options of {@link FormatTable}. */ +public class FormatTableOptions { + + public static final ConfigOption FIELD_DELIMITER = + ConfigOptions.key("csv.field-delimiter") + .stringType() + .defaultValue(",") + .withDescription("Optional field delimiter character (',' by default)"); +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 7765215353a3..d8bd2343979f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -30,6 +30,7 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.FileStorePathFactory; @@ -258,6 +259,15 @@ private CatalogTable getTable(ObjectPath tablePath, @Nullable Long timestamp) throw new TableNotExistException(getName(), tablePath); } + if (table instanceof FormatTable) { + if (timestamp != null) { + throw new UnsupportedOperationException( + String.format( + "Format table %s cannot support as of timestamp.", tablePath)); + } + return new FormatCatalogTable((FormatTable) table); + } + if (timestamp != null) { Options options = new Options(); options.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 1617b4cc3327..96c81fdb720c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; +import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -42,12 +43,20 @@ public String factoryIdentifier() { @Override public DynamicTableSource createDynamicTableSource(Context context) { + CatalogTable table = context.getCatalogTable().getOrigin(); + if (table instanceof FormatCatalogTable) { + return ((FormatCatalogTable) table).createTableSource(context); + } createTableIfNeeded(context); return super.createDynamicTableSource(context); } @Override public DynamicTableSink createDynamicTableSink(Context context) { + CatalogTable table = context.getCatalogTable().getOrigin(); + if (table instanceof FormatCatalogTable) { + return ((FormatCatalogTable) table).createTableSink(context); + } createTableIfNeeded(context); return super.createDynamicTableSink(context); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java new file mode 100644 index 000000000000..95aff5d84796 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.table.FormatTable; + +import org.apache.flink.connector.file.table.FileSystemTableFactory; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.table.factories.FactoryUtil.FORMAT; +import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; +import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; + +/** A {@link CatalogTable} to represent format table. */ +public class FormatCatalogTable implements CatalogTable { + + private final FormatTable table; + + private Map cachedOptions; + + public FormatCatalogTable(FormatTable table) { + this.table = table; + } + + public FormatTable table() { + return table; + } + + @Override + public Schema getUnresolvedSchema() { + return Schema.newBuilder() + .fromRowDataType(fromLogicalToDataType(toLogicalType(table.rowType()))) + .build(); + } + + @Override + public boolean isPartitioned() { + return !table.partitionKeys().isEmpty(); + } + + @Override + public List getPartitionKeys() { + return table.partitionKeys(); + } + + @Override + public CatalogTable copy(Map map) { + return new FormatCatalogTable(table.copy(map)); + } + + @Override + public Map getOptions() { + if (cachedOptions == null) { + cachedOptions = new HashMap<>(); + FileSystemTableFactory fileSystemFactory = new FileSystemTableFactory(); + Set validOptions = new HashSet<>(); + fileSystemFactory.requiredOptions().forEach(o -> validOptions.add(o.key())); + fileSystemFactory.optionalOptions().forEach(o -> validOptions.add(o.key())); + String format = table.format().name().toLowerCase(); + table.options() + .forEach( + (k, v) -> { + if (validOptions.contains(k) || k.startsWith(format + ".")) { + cachedOptions.put(k, v); + } + }); + cachedOptions.put(CONNECTOR.key(), "filesystem"); + cachedOptions.put(PATH.key(), table.location()); + cachedOptions.put(FORMAT.key(), format); + } + return cachedOptions; + } + + @Override + public String getComment() { + return table.comment().orElse(""); + } + + @Override + public CatalogTable copy() { + return copy(Collections.emptyMap()); + } + + @Override + public Optional getDescription() { + return table.comment(); + } + + @Override + public Optional getDetailedDescription() { + return getDescription(); + } + + public DynamicTableSource createTableSource(DynamicTableFactory.Context context) { + return FactoryUtil.createDynamicTableSource( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } + + public DynamicTableSink createTableSink(DynamicTableFactory.Context context) { + return FactoryUtil.createDynamicTableSink( + null, + context.getObjectIdentifier(), + context.getCatalogTable(), + context.getConfiguration(), + context.getClassLoader(), + context.isTemporary()); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 5da73341b857..12307521f882 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -41,6 +41,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.TableType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -85,6 +86,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; +import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; @@ -156,6 +158,10 @@ public HiveCatalog( this.clients = new CachedClientPool(hiveConf, options, clientClassName); } + private boolean formatTableEnabled() { + return options.get(FORMAT_TABLE_ENABLED); + } + @Override public Optional defaultLockFactory() { return Optional.of(new HiveCatalogLockFactory()); @@ -415,10 +421,26 @@ public boolean tableExists(Identifier identifier) { "Interrupted in call to tableExists " + identifier.getFullName(), e); } - return isPaimonTable(table) - && tableSchemaInFileSystem( - getTableLocation(identifier), identifier.getBranchNameOrDefault()) - .isPresent(); + boolean isDataTable = + isPaimonTable(table) + && tableSchemaInFileSystem( + getTableLocation(identifier), + identifier.getBranchNameOrDefault()) + .isPresent(); + if (isDataTable) { + return true; + } + + if (formatTableEnabled()) { + try { + HiveFormatTableUtils.convertToFormatTable(table); + return true; + } catch (UnsupportedOperationException e) { + return false; + } + } + + return false; } private static boolean isPaimonTable(Table table) { @@ -439,6 +461,35 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis .orElseThrow(() -> new TableNotExistException(identifier)); } + @Override + public FormatTable getFormatTable(Identifier identifier) throws TableNotExistException { + if (!formatTableEnabled()) { + throw new TableNotExistException(identifier); + } + + Table table; + try { + table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getTableName())); + } catch (NoSuchObjectException e) { + throw new TableNotExistException(identifier); + } catch (TException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + try { + return HiveFormatTableUtils.convertToFormatTable(table); + } catch (UnsupportedOperationException e) { + throw new TableNotExistException(identifier); + } + } + private boolean usingExternalTable() { TableType tableType = OptionsUtils.convertToEnum( diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java index 38f73bc6bd65..c74fa447ea46 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java @@ -85,5 +85,14 @@ public final class HiveCatalogOptions { + "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.")) .build()); + public static final ConfigOption FORMAT_TABLE_ENABLED = + ConfigOptions.key("format-table.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. " + + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + + " the metastore and need to be manually added as separate partition operations."); + private HiveCatalogOptions() {} } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java new file mode 100644 index 000000000000..dcb7c7ee74cb --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FormatTable; +import org.apache.paimon.table.FormatTable.Format; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Pair; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; +import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; +import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; + +class HiveFormatTableUtils { + + public static FormatTable convertToFormatTable(Table hiveTable) { + if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) { + throw new UnsupportedOperationException("Hive view is not supported."); + } + + Identifier identifier = new Identifier(hiveTable.getDbName(), hiveTable.getTableName()); + Map options = new HashMap<>(hiveTable.getParameters()); + List partitionKeys = getFieldNames(hiveTable.getPartitionKeys()); + RowType rowType = createRowType(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); + String comment = options.remove(COMMENT_PROP); + String location = hiveTable.getSd().getLocation(); + Format format; + SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo(); + String serLib = serdeInfo.getSerializationLib().toLowerCase(); + String inputFormat = hiveTable.getSd().getInputFormat(); + if (serLib.contains("parquet")) { + format = Format.PARQUET; + } else if (serLib.contains("orc")) { + format = Format.ORC; + } else if (inputFormat.contains("Text")) { + format = Format.CSV; + // hive default field delimiter is '\u0001' + options.put( + FIELD_DELIMITER.key(), + serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); + } else { + throw new UnsupportedOperationException("Unsupported table: " + hiveTable); + } + return FormatTable.builder() + .identifier(identifier) + .rowType(rowType) + .partitionKeys(partitionKeys) + .location(location) + .format(format) + .options(options) + .comment(comment) + .build(); + } + + /** Get field names from field schemas. */ + private static List getFieldNames(List fieldSchemas) { + List names = new ArrayList<>(fieldSchemas.size()); + for (FieldSchema fs : fieldSchemas) { + names.add(fs.getName()); + } + return names; + } + + /** Create a Paimon's Schema from Hive table's columns and partition keys. */ + private static RowType createRowType( + List nonPartCols, List partitionKeys) { + List allCols = new ArrayList<>(nonPartCols); + allCols.addAll(partitionKeys); + Pair columnInformation = extractColumnInformation(allCols); + return RowType.builder() + .fields(columnInformation.getRight(), columnInformation.getLeft()) + .build(); + } + + private static Pair extractColumnInformation(List allCols) { + String[] colNames = new String[allCols.size()]; + DataType[] colTypes = new DataType[allCols.size()]; + + for (int i = 0; i < allCols.size(); i++) { + FieldSchema fs = allCols.get(i); + colNames[i] = fs.getName(); + colTypes[i] = + HiveTypeUtils.toPaimonType( + TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())); + } + + return Pair.of(colNames, colTypes); + } +} diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index 34c6c2d18f66..2e6cba83cec0 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -106,6 +106,13 @@ under the License. + + org.apache.flink + flink-csv + ${test.flink.version} + test + + org.apache.flink flink-connector-test-utils diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java new file mode 100644 index 000000000000..d2e277dd2272 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import com.klarna.hiverunner.annotations.HiveRunnerSetup; +import com.klarna.hiverunner.config.HiveRunnerConfig; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; +import org.junit.Test; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER; + +/** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive 2.3 connector. */ +public class Hive23CatalogFormatTableITCase extends HiveCatalogFormatTableITCaseBase { + + @HiveRunnerSetup + private static final HiveRunnerConfig CONFIG = + new HiveRunnerConfig() { + { + // catalog lock needs txn manager + // hive-3.x requires a proper txn manager to create ACID table + getHiveConfSystemOverride() + .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName()); + getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true"); + // tell TxnHandler to prepare txn DB + getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true"); + } + }; + + @Override + @Test + public void testPartitionTable() { + // Need to specify partition columns because the destination table is partitioned. + } +} diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index 4863c8b4fdf4..a46c2bc182d6 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -100,6 +100,13 @@ under the License. test + + org.apache.flink + flink-csv + ${test.flink.version} + test + + org.apache.flink flink-orc diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java new file mode 100644 index 000000000000..6563759ce740 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import com.klarna.hiverunner.annotations.HiveRunnerSetup; +import com.klarna.hiverunner.config.HiveRunnerConfig; +import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER; + +/** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive 3.1 connector. */ +public class Hive31CatalogFormatTableITCase extends HiveCatalogFormatTableITCaseBase { + + @HiveRunnerSetup + private static final HiveRunnerConfig CONFIG = + new HiveRunnerConfig() { + { + // catalog lock needs txn manager + // hive-3.x requires a proper txn manager to create ACID table + getHiveConfSystemOverride() + .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName()); + getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true"); + // tell TxnHandler to prepare txn DB + getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true"); + } + }; +} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java new file mode 100644 index 000000000000..9a7ff1e586a1 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive; + +import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner; + +import com.klarna.hiverunner.HiveShell; +import com.klarna.hiverunner.annotations.HiveSQL; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestRule; +import org.junit.runner.RunWith; +import org.junit.runners.model.Statement; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED; +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for using Paimon {@link HiveCatalog}. */ +@RunWith(PaimonEmbeddedHiveRunner.class) +public abstract class HiveCatalogFormatTableITCaseBase { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + protected String path; + protected TableEnvironment tEnv; + + @HiveSQL(files = {}) + protected static HiveShell hiveShell; + + private void before(boolean locationInProperties) throws Exception { + this.path = folder.newFolder().toURI().toString(); + Map options = new HashMap<>(); + options.put("type", "paimon"); + options.put("metastore", "hive"); + options.put("uri", ""); + options.put("lock.enabled", "true"); + options.put("location-in-properties", String.valueOf(locationInProperties)); + options.put("warehouse", path); + options.put(FORMAT_TABLE_ENABLED.key(), "true"); + tEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG my_hive WITH (", + options.entrySet().stream() + .map( + e -> + String.format( + "'%s' = '%s'", + e.getKey(), e.getValue())) + .collect(Collectors.joining(",\n")), + ")")) + .await(); + + tEnv.executeSql("USE CATALOG my_hive").await(); + tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE"); + tEnv.executeSql("CREATE DATABASE test_db").await(); + tEnv.executeSql("USE test_db").await(); + + hiveShell.execute("USE test_db"); + } + + private void after() { + hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE"); + } + + @Target(ElementType.METHOD) + @Retention(RetentionPolicy.RUNTIME) + private @interface LocationInProperties {} + + @Rule + public TestRule environmentRule = + (base, description) -> + new Statement() { + @Override + public void evaluate() throws Throwable { + try { + before( + description.getAnnotation(LocationInProperties.class) + != null); + base.evaluate(); + } finally { + after(); + } + } + }; + + @Test + public void testCsvFormatTable() throws Exception { + hiveShell.execute("CREATE TABLE csv_table (a INT, b STRING)"); + doTestFormatTable("csv_table"); + } + + @Test + public void testCsvFormatTableWithDelimiter() throws Exception { + hiveShell.execute( + "CREATE TABLE csv_table_delimiter (a INT, b STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'"); + doTestFormatTable("csv_table_delimiter"); + } + + @Test + public void testPartitionTable() throws Exception { + hiveShell.execute("CREATE TABLE partition_table (a INT) PARTITIONED BY (b STRING)"); + doTestFormatTable("partition_table"); + } + + private void doTestFormatTable(String tableName) throws Exception { + hiveShell.execute( + String.format("INSERT INTO %s VALUES (100, 'Hive'), (200, 'Table')", tableName)); + assertThat(collect(String.format("SELECT * FROM %s", tableName))) + .containsExactlyInAnyOrder(Row.of(100, "Hive"), Row.of(200, "Table")); + tEnv.executeSql(String.format("INSERT INTO %s VALUES (300, 'Paimon')", tableName)).await(); + assertThat(collect(String.format("SELECT * FROM %s", tableName))) + .containsExactlyInAnyOrder( + Row.of(100, "Hive"), Row.of(200, "Table"), Row.of(300, "Paimon")); + } + + @Test + public void testListTables() throws Exception { + hiveShell.execute("CREATE TABLE list_table ( a INT, b STRING)"); + assertThat(collect("SHOW TABLES")).containsExactlyInAnyOrder(Row.of("list_table")); + } + + protected List collect(String sql) throws Exception { + List result = new ArrayList<>(); + try (CloseableIterator it = tEnv.executeSql(sql).collect()) { + while (it.hasNext()) { + result.add(it.next()); + } + } + return result; + } +}