diff --git a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
index e0257d301b6f..076a46232c3c 100644
--- a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
@@ -39,6 +39,12 @@
hadoop-conf-dir |
(none) |
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 365887113d1e..dab9508282fa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -34,6 +34,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
@@ -336,7 +337,11 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
- return getDataTable(identifier);
+ try {
+ return getDataTable(identifier);
+ } catch (TableNotExistException e) {
+ return getFormatTable(identifier);
+ }
}
}
@@ -355,6 +360,17 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE
lineageMetaFactory));
}
+ /**
+ * Return a {@link FormatTable} identified by the given {@link Identifier}.
+ *
+ * @param identifier Path of the table
+ * @return The requested table
+ * @throws Catalog.TableNotExistException if the target does not exist
+ */
+ public FormatTable getFormatTable(Identifier identifier) throws Catalog.TableNotExistException {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+
/**
* Get warehouse path for specified database. If a catalog would like to provide individual path
* for each database, this method can be `Override` in that catalog.
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
new file mode 100644
index 000000000000..2ec38a70d247
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SimpleFileReader;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+/**
+ * A file format table refers to a directory that contains multiple files of the same format, where
+ * operations on this table allow for reading or writing to these files, facilitating the retrieval
+ * of existing data and the addition of new files.
+ *
+ * Partitioned file format table just like the standard hive format. Partitions are discovered
+ * and inferred based on directory structure.
+ *
+ * @since 0.9.0
+ */
+@Public
+public interface FormatTable extends Table {
+
+ /** Directory location in file system. */
+ String location();
+
+ /** Format of this table. */
+ Format format();
+
+ @Override
+ FormatTable copy(Map dynamicOptions);
+
+ /** Currently supported formats. */
+ enum Format {
+ ORC,
+ PARQUET,
+ CSV
+ }
+
+ /** Create a new builder for {@link FormatTable}. */
+ static FormatTable.Builder builder() {
+ return new FormatTable.Builder();
+ }
+
+ /** Builder for {@link FormatTable}. */
+ class Builder {
+
+ private Identifier identifier;
+ private RowType rowType;
+ private List partitionKeys;
+ private String location;
+ private FormatTable.Format format;
+ private Map options;
+ @Nullable private String comment;
+
+ public FormatTable.Builder identifier(Identifier identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ public FormatTable.Builder rowType(RowType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
+ public FormatTable.Builder partitionKeys(List partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public FormatTable.Builder location(String location) {
+ this.location = location;
+ return this;
+ }
+
+ public FormatTable.Builder format(FormatTable.Format format) {
+ this.format = format;
+ return this;
+ }
+
+ public FormatTable.Builder options(Map options) {
+ this.options = options;
+ return this;
+ }
+
+ public FormatTable.Builder comment(@Nullable String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ public FormatTable build() {
+ return new FormatTable.FormatTableImpl(
+ identifier, rowType, partitionKeys, location, format, options, comment);
+ }
+ }
+
+ /** An implementation for {@link FormatTable}. */
+ class FormatTableImpl implements FormatTable {
+
+ private final Identifier identifier;
+ private final RowType rowType;
+ private final List partitionKeys;
+ private final String location;
+ private final Format format;
+ private final Map options;
+ @Nullable private final String comment;
+
+ public FormatTableImpl(
+ Identifier identifier,
+ RowType rowType,
+ List partitionKeys,
+ String location,
+ Format format,
+ Map options,
+ @Nullable String comment) {
+ this.identifier = identifier;
+ this.rowType = rowType;
+ this.partitionKeys = partitionKeys;
+ this.location = location;
+ this.format = format;
+ this.options = options;
+ this.comment = comment;
+ }
+
+ @Override
+ public String name() {
+ return identifier.getTableName();
+ }
+
+ @Override
+ public String fullName() {
+ return identifier.getFullName();
+ }
+
+ @Override
+ public RowType rowType() {
+ return rowType;
+ }
+
+ @Override
+ public List partitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public List primaryKeys() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public Format format() {
+ return format;
+ }
+
+ @Override
+ public Map options() {
+ return options;
+ }
+
+ @Override
+ public Optional comment() {
+ return Optional.ofNullable(comment);
+ }
+
+ @Override
+ public FormatTable copy(Map dynamicOptions) {
+ Map newOptions = new HashMap<>(options);
+ newOptions.putAll(dynamicOptions);
+ return new FormatTableImpl(
+ identifier, rowType, partitionKeys, location, format, newOptions, comment);
+ }
+ }
+
+ // ===================== Unsupported ===============================
+
+ @Override
+ default Optional statistics() {
+ return Optional.empty();
+ }
+
+ @Override
+ default OptionalLong latestSnapshotId() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default Snapshot snapshot(long snapshotId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default SimpleFileReader manifestListReader() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default SimpleFileReader manifestFileReader() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void rollbackTo(long snapshotId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createTag(String tagName, long fromSnapshotId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createTag(String tagName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createTag(String tagName, Duration timeRetained) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void deleteTag(String tagName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void rollbackTo(String tagName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createBranch(String branchName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void createBranch(String branchName, String tagName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void deleteBranch(String branchName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default void fastForward(String branchName) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default ExpireSnapshots newExpireSnapshots() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default ExpireSnapshots newExpireChangelog() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default ReadBuilder newReadBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default BatchWriteBuilder newBatchWriteBuilder() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default StreamWriteBuilder newStreamWriteBuilder() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
new file mode 100644
index 000000000000..64f134d07588
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+
+/** Options of {@link FormatTable}. */
+public class FormatTableOptions {
+
+ public static final ConfigOption FIELD_DELIMITER =
+ ConfigOptions.key("csv.field-delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("Optional field delimiter character (',' by default)");
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 7765215353a3..d8bd2343979f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -30,6 +30,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -258,6 +259,15 @@ private CatalogTable getTable(ObjectPath tablePath, @Nullable Long timestamp)
throw new TableNotExistException(getName(), tablePath);
}
+ if (table instanceof FormatTable) {
+ if (timestamp != null) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Format table %s cannot support as of timestamp.", tablePath));
+ }
+ return new FormatCatalogTable((FormatTable) table);
+ }
+
if (timestamp != null) {
Options options = new Options();
options.set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
index 1617b4cc3327..96c81fdb720c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
@@ -24,6 +24,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -42,12 +43,20 @@ public String factoryIdentifier() {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
+ CatalogTable table = context.getCatalogTable().getOrigin();
+ if (table instanceof FormatCatalogTable) {
+ return ((FormatCatalogTable) table).createTableSource(context);
+ }
createTableIfNeeded(context);
return super.createDynamicTableSource(context);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
+ CatalogTable table = context.getCatalogTable().getOrigin();
+ if (table instanceof FormatCatalogTable) {
+ return ((FormatCatalogTable) table).createTableSink(context);
+ }
createTableIfNeeded(context);
return super.createDynamicTableSink(context);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
new file mode 100644
index 000000000000..95aff5d84796
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.table.FormatTable;
+
+import org.apache.flink.connector.file.table.FileSystemTableFactory;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+
+/** A {@link CatalogTable} to represent format table. */
+public class FormatCatalogTable implements CatalogTable {
+
+ private final FormatTable table;
+
+ private Map cachedOptions;
+
+ public FormatCatalogTable(FormatTable table) {
+ this.table = table;
+ }
+
+ public FormatTable table() {
+ return table;
+ }
+
+ @Override
+ public Schema getUnresolvedSchema() {
+ return Schema.newBuilder()
+ .fromRowDataType(fromLogicalToDataType(toLogicalType(table.rowType())))
+ .build();
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return !table.partitionKeys().isEmpty();
+ }
+
+ @Override
+ public List getPartitionKeys() {
+ return table.partitionKeys();
+ }
+
+ @Override
+ public CatalogTable copy(Map map) {
+ return new FormatCatalogTable(table.copy(map));
+ }
+
+ @Override
+ public Map getOptions() {
+ if (cachedOptions == null) {
+ cachedOptions = new HashMap<>();
+ FileSystemTableFactory fileSystemFactory = new FileSystemTableFactory();
+ Set validOptions = new HashSet<>();
+ fileSystemFactory.requiredOptions().forEach(o -> validOptions.add(o.key()));
+ fileSystemFactory.optionalOptions().forEach(o -> validOptions.add(o.key()));
+ String format = table.format().name().toLowerCase();
+ table.options()
+ .forEach(
+ (k, v) -> {
+ if (validOptions.contains(k) || k.startsWith(format + ".")) {
+ cachedOptions.put(k, v);
+ }
+ });
+ cachedOptions.put(CONNECTOR.key(), "filesystem");
+ cachedOptions.put(PATH.key(), table.location());
+ cachedOptions.put(FORMAT.key(), format);
+ }
+ return cachedOptions;
+ }
+
+ @Override
+ public String getComment() {
+ return table.comment().orElse("");
+ }
+
+ @Override
+ public CatalogTable copy() {
+ return copy(Collections.emptyMap());
+ }
+
+ @Override
+ public Optional getDescription() {
+ return table.comment();
+ }
+
+ @Override
+ public Optional getDetailedDescription() {
+ return getDescription();
+ }
+
+ public DynamicTableSource createTableSource(DynamicTableFactory.Context context) {
+ return FactoryUtil.createDynamicTableSource(
+ null,
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+
+ public DynamicTableSink createTableSink(DynamicTableFactory.Context context) {
+ return FactoryUtil.createDynamicTableSink(
+ null,
+ context.getObjectIdentifier(),
+ context.getCatalogTable(),
+ context.getConfiguration(),
+ context.getClassLoader(),
+ context.isTemporary());
+ }
+}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 5da73341b857..12307521f882 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -41,6 +41,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.TableType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -85,6 +86,7 @@
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
+import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
@@ -156,6 +158,10 @@ public HiveCatalog(
this.clients = new CachedClientPool(hiveConf, options, clientClassName);
}
+ private boolean formatTableEnabled() {
+ return options.get(FORMAT_TABLE_ENABLED);
+ }
+
@Override
public Optional defaultLockFactory() {
return Optional.of(new HiveCatalogLockFactory());
@@ -415,10 +421,26 @@ public boolean tableExists(Identifier identifier) {
"Interrupted in call to tableExists " + identifier.getFullName(), e);
}
- return isPaimonTable(table)
- && tableSchemaInFileSystem(
- getTableLocation(identifier), identifier.getBranchNameOrDefault())
- .isPresent();
+ boolean isDataTable =
+ isPaimonTable(table)
+ && tableSchemaInFileSystem(
+ getTableLocation(identifier),
+ identifier.getBranchNameOrDefault())
+ .isPresent();
+ if (isDataTable) {
+ return true;
+ }
+
+ if (formatTableEnabled()) {
+ try {
+ HiveFormatTableUtils.convertToFormatTable(table);
+ return true;
+ } catch (UnsupportedOperationException e) {
+ return false;
+ }
+ }
+
+ return false;
}
private static boolean isPaimonTable(Table table) {
@@ -439,6 +461,35 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
.orElseThrow(() -> new TableNotExistException(identifier));
}
+ @Override
+ public FormatTable getFormatTable(Identifier identifier) throws TableNotExistException {
+ if (!formatTableEnabled()) {
+ throw new TableNotExistException(identifier);
+ }
+
+ Table table;
+ try {
+ table =
+ clients.run(
+ client ->
+ client.getTable(
+ identifier.getDatabaseName(),
+ identifier.getTableName()));
+ } catch (NoSuchObjectException e) {
+ throw new TableNotExistException(identifier);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ try {
+ return HiveFormatTableUtils.convertToFormatTable(table);
+ } catch (UnsupportedOperationException e) {
+ throw new TableNotExistException(identifier);
+ }
+ }
+
private boolean usingExternalTable() {
TableType tableType =
OptionsUtils.convertToEnum(
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
index 38f73bc6bd65..c74fa447ea46 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
@@ -85,5 +85,14 @@ public final class HiveCatalogOptions {
+ "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified."))
.build());
+ public static final ConfigOption FORMAT_TABLE_ENABLED =
+ ConfigOptions.key("format-table.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. "
+ + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in"
+ + " the metastore and need to be manually added as separate partition operations.");
+
private HiveCatalogOptions() {}
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java
new file mode 100644
index 000000000000..dcb7c7ee74cb
--- /dev/null
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.FormatTable.Format;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
+import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
+import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
+
+class HiveFormatTableUtils {
+
+ public static FormatTable convertToFormatTable(Table hiveTable) {
+ if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) {
+ throw new UnsupportedOperationException("Hive view is not supported.");
+ }
+
+ Identifier identifier = new Identifier(hiveTable.getDbName(), hiveTable.getTableName());
+ Map options = new HashMap<>(hiveTable.getParameters());
+ List partitionKeys = getFieldNames(hiveTable.getPartitionKeys());
+ RowType rowType = createRowType(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys());
+ String comment = options.remove(COMMENT_PROP);
+ String location = hiveTable.getSd().getLocation();
+ Format format;
+ SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo();
+ String serLib = serdeInfo.getSerializationLib().toLowerCase();
+ String inputFormat = hiveTable.getSd().getInputFormat();
+ if (serLib.contains("parquet")) {
+ format = Format.PARQUET;
+ } else if (serLib.contains("orc")) {
+ format = Format.ORC;
+ } else if (inputFormat.contains("Text")) {
+ format = Format.CSV;
+ // hive default field delimiter is '\u0001'
+ options.put(
+ FIELD_DELIMITER.key(),
+ serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
+ } else {
+ throw new UnsupportedOperationException("Unsupported table: " + hiveTable);
+ }
+ return FormatTable.builder()
+ .identifier(identifier)
+ .rowType(rowType)
+ .partitionKeys(partitionKeys)
+ .location(location)
+ .format(format)
+ .options(options)
+ .comment(comment)
+ .build();
+ }
+
+ /** Get field names from field schemas. */
+ private static List getFieldNames(List fieldSchemas) {
+ List names = new ArrayList<>(fieldSchemas.size());
+ for (FieldSchema fs : fieldSchemas) {
+ names.add(fs.getName());
+ }
+ return names;
+ }
+
+ /** Create a Paimon's Schema from Hive table's columns and partition keys. */
+ private static RowType createRowType(
+ List nonPartCols, List partitionKeys) {
+ List allCols = new ArrayList<>(nonPartCols);
+ allCols.addAll(partitionKeys);
+ Pair columnInformation = extractColumnInformation(allCols);
+ return RowType.builder()
+ .fields(columnInformation.getRight(), columnInformation.getLeft())
+ .build();
+ }
+
+ private static Pair extractColumnInformation(List allCols) {
+ String[] colNames = new String[allCols.size()];
+ DataType[] colTypes = new DataType[allCols.size()];
+
+ for (int i = 0; i < allCols.size(); i++) {
+ FieldSchema fs = allCols.get(i);
+ colNames[i] = fs.getName();
+ colTypes[i] =
+ HiveTypeUtils.toPaimonType(
+ TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+ }
+
+ return Pair.of(colNames, colTypes);
+ }
+}
diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml
index 34c6c2d18f66..2e6cba83cec0 100644
--- a/paimon-hive/paimon-hive-connector-2.3/pom.xml
+++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml
@@ -106,6 +106,13 @@ under the License.
+
+ org.apache.flink
+ flink-csv
+ ${test.flink.version}
+ test
+
+
org.apache.flink
flink-connector-test-utils
diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
new file mode 100644
index 000000000000..d2e277dd2272
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import com.klarna.hiverunner.annotations.HiveRunnerSetup;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
+
+/** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive 2.3 connector. */
+public class Hive23CatalogFormatTableITCase extends HiveCatalogFormatTableITCaseBase {
+
+ @HiveRunnerSetup
+ private static final HiveRunnerConfig CONFIG =
+ new HiveRunnerConfig() {
+ {
+ // catalog lock needs txn manager
+ // hive-3.x requires a proper txn manager to create ACID table
+ getHiveConfSystemOverride()
+ .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
+ getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ // tell TxnHandler to prepare txn DB
+ getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true");
+ }
+ };
+
+ @Override
+ @Test
+ public void testPartitionTable() {
+ // Need to specify partition columns because the destination table is partitioned.
+ }
+}
diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml
index 4863c8b4fdf4..a46c2bc182d6 100644
--- a/paimon-hive/paimon-hive-connector-3.1/pom.xml
+++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml
@@ -100,6 +100,13 @@ under the License.
test
+
+ org.apache.flink
+ flink-csv
+ ${test.flink.version}
+ test
+
+
org.apache.flink
flink-orc
diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java
new file mode 100644
index 000000000000..6563759ce740
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogFormatTableITCase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import com.klarna.hiverunner.annotations.HiveRunnerSetup;
+import com.klarna.hiverunner.config.HiveRunnerConfig;
+import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_IN_TEST;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TXN_MANAGER;
+
+/** IT cases for using Paimon {@link HiveCatalog} together with Paimon Hive 3.1 connector. */
+public class Hive31CatalogFormatTableITCase extends HiveCatalogFormatTableITCaseBase {
+
+ @HiveRunnerSetup
+ private static final HiveRunnerConfig CONFIG =
+ new HiveRunnerConfig() {
+ {
+ // catalog lock needs txn manager
+ // hive-3.x requires a proper txn manager to create ACID table
+ getHiveConfSystemOverride()
+ .put(HIVE_TXN_MANAGER.varname, DbTxnManager.class.getName());
+ getHiveConfSystemOverride().put(HIVE_SUPPORT_CONCURRENCY.varname, "true");
+ // tell TxnHandler to prepare txn DB
+ getHiveConfSystemOverride().put(HIVE_IN_TEST.varname, "true");
+ }
+ };
+}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
new file mode 100644
index 000000000000..9a7ff1e586a1
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestRule;
+import org.junit.runner.RunWith;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for using Paimon {@link HiveCatalog}. */
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public abstract class HiveCatalogFormatTableITCaseBase {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ protected String path;
+ protected TableEnvironment tEnv;
+
+ @HiveSQL(files = {})
+ protected static HiveShell hiveShell;
+
+ private void before(boolean locationInProperties) throws Exception {
+ this.path = folder.newFolder().toURI().toString();
+ Map options = new HashMap<>();
+ options.put("type", "paimon");
+ options.put("metastore", "hive");
+ options.put("uri", "");
+ options.put("lock.enabled", "true");
+ options.put("location-in-properties", String.valueOf(locationInProperties));
+ options.put("warehouse", path);
+ options.put(FORMAT_TABLE_ENABLED.key(), "true");
+ tEnv = TableEnvironmentImpl.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive WITH (",
+ options.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+ "'%s' = '%s'",
+ e.getKey(), e.getValue()))
+ .collect(Collectors.joining(",\n")),
+ ")"))
+ .await();
+
+ tEnv.executeSql("USE CATALOG my_hive").await();
+ tEnv.executeSql("DROP DATABASE IF EXISTS test_db CASCADE");
+ tEnv.executeSql("CREATE DATABASE test_db").await();
+ tEnv.executeSql("USE test_db").await();
+
+ hiveShell.execute("USE test_db");
+ }
+
+ private void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ }
+
+ @Target(ElementType.METHOD)
+ @Retention(RetentionPolicy.RUNTIME)
+ private @interface LocationInProperties {}
+
+ @Rule
+ public TestRule environmentRule =
+ (base, description) ->
+ new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ try {
+ before(
+ description.getAnnotation(LocationInProperties.class)
+ != null);
+ base.evaluate();
+ } finally {
+ after();
+ }
+ }
+ };
+
+ @Test
+ public void testCsvFormatTable() throws Exception {
+ hiveShell.execute("CREATE TABLE csv_table (a INT, b STRING)");
+ doTestFormatTable("csv_table");
+ }
+
+ @Test
+ public void testCsvFormatTableWithDelimiter() throws Exception {
+ hiveShell.execute(
+ "CREATE TABLE csv_table_delimiter (a INT, b STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ';'");
+ doTestFormatTable("csv_table_delimiter");
+ }
+
+ @Test
+ public void testPartitionTable() throws Exception {
+ hiveShell.execute("CREATE TABLE partition_table (a INT) PARTITIONED BY (b STRING)");
+ doTestFormatTable("partition_table");
+ }
+
+ private void doTestFormatTable(String tableName) throws Exception {
+ hiveShell.execute(
+ String.format("INSERT INTO %s VALUES (100, 'Hive'), (200, 'Table')", tableName));
+ assertThat(collect(String.format("SELECT * FROM %s", tableName)))
+ .containsExactlyInAnyOrder(Row.of(100, "Hive"), Row.of(200, "Table"));
+ tEnv.executeSql(String.format("INSERT INTO %s VALUES (300, 'Paimon')", tableName)).await();
+ assertThat(collect(String.format("SELECT * FROM %s", tableName)))
+ .containsExactlyInAnyOrder(
+ Row.of(100, "Hive"), Row.of(200, "Table"), Row.of(300, "Paimon"));
+ }
+
+ @Test
+ public void testListTables() throws Exception {
+ hiveShell.execute("CREATE TABLE list_table ( a INT, b STRING)");
+ assertThat(collect("SHOW TABLES")).containsExactlyInAnyOrder(Row.of("list_table"));
+ }
+
+ protected List collect(String sql) throws Exception {
+ List result = new ArrayList<>();
+ try (CloseableIterator it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
+}