From 4cdaf66dfbe492d7b6d3c18fbd251b19d39c1f7a Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 7 Nov 2024 15:24:25 +0800 Subject: [PATCH] [hive] Enable Format Table by default (#4461) --- .../generated/catalog_configuration.html | 6 ++ .../generated/hive_catalog_configuration.html | 6 -- .../apache/paimon/options/CatalogOptions.java | 9 +++ .../paimon/table/FormatTableOptions.java | 5 +- .../paimon/catalog/CatalogTestBase.java | 47 +++++++++++++++ .../org/apache/paimon/flink/FlinkCatalog.java | 4 +- .../flink/FlinkGenericCatalogFactory.java | 1 + .../paimon/flink/FormatCatalogTable.java | 25 ++++---- .../paimon/flink/CatalogTableITCase.java | 3 +- .../org/apache/paimon/hive/HiveCatalog.java | 57 +++++++++++++------ .../paimon/hive/HiveCatalogOptions.java | 9 --- .../apache/paimon/hive/HiveTableUtils.java | 41 +++++-------- .../apache/paimon/hive/HiveCatalogTest.java | 5 ++ .../paimon/hive/Hive23CatalogITCase.java | 3 +- .../paimon/hive/Hive31CatalogITCase.java | 1 - .../HiveCatalogFormatTableITCaseBase.java | 4 +- .../paimon/hive/HiveCatalogITCaseBase.java | 24 ++------ .../org/apache/paimon/spark/SparkCatalog.java | 6 +- .../paimon/spark/SparkGenericCatalog.java | 1 + .../spark/SparkCatalogWithHiveTest.java | 44 +++++++------- 20 files changed, 179 insertions(+), 122 deletions(-) diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index 8954e898f1c2..3686fa20c68a 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -74,6 +74,12 @@ Integer Configure the size of the connection pool. + +
format-table.enabled
+ true + Boolean + Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. +
lineage-meta
(none) diff --git a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html index 076a46232c3c..e0257d301b6f 100644 --- a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html @@ -39,12 +39,6 @@ String Specify client cache key, multiple elements separated by commas.
- -
format-table.enabled
- false - Boolean - Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in the metastore and need to be manually added as separate partition operations. -
hadoop-conf-dir
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index ace4daf5e263..6ad9f3350adf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -156,4 +156,13 @@ public class CatalogOptions { .booleanType() .defaultValue(false) .withDescription("Sync all table properties to hive metastore"); + + public static final ConfigOption FORMAT_TABLE_ENABLED = + ConfigOptions.key("format-table.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to support format tables, format table corresponds to a regular csv, parquet or orc table, allowing read and write operations. " + + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" + + " the metastore and need to be manually added as separate partition operations."); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java index 64f134d07588..b4010209c32a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java @@ -25,8 +25,9 @@ public class FormatTableOptions { public static final ConfigOption FIELD_DELIMITER = - ConfigOptions.key("csv.field-delimiter") + ConfigOptions.key("field-delimiter") .stringType() .defaultValue(",") - .withDescription("Optional field delimiter character (',' by default)"); + .withDescription( + "Optional field delimiter character for CSV (',' by default)."); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 27992b56f04f..dbeedcfe5b9c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -25,6 +25,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -885,4 +886,50 @@ public void testView() throws Exception { assertThatThrownBy(() -> catalog.dropView(newIdentifier, false)) .isInstanceOf(Catalog.ViewNotExistException.class); } + + protected boolean supportsFormatTable() { + return false; + } + + @Test + public void testFormatTable() throws Exception { + if (!supportsFormatTable()) { + return; + } + + Identifier identifier = new Identifier("format_db", "my_format"); + catalog.createDatabase(identifier.getDatabaseName(), false); + + // create table + Schema schema = + Schema.newBuilder() + .column("str", DataTypes.STRING()) + .column("int", DataTypes.INT()) + .option("type", "format-table") + .option("file.format", "csv") + .build(); + catalog.createTable(identifier, schema, false); + assertThat(catalog.listTables(identifier.getDatabaseName())) + .contains(identifier.getTableName()); + assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class); + + // alter table + SchemaChange schemaChange = SchemaChange.addColumn("new_col", DataTypes.STRING()); + assertThatThrownBy(() -> catalog.alterTable(identifier, schemaChange, false)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Only data table support alter table."); + + // drop table + catalog.dropTable(identifier, false); + assertThatThrownBy(() -> catalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // rename table + catalog.createTable(identifier, schema, false); + Identifier newIdentifier = new Identifier("format_db", "new_format"); + catalog.renameTable(identifier, newIdentifier, false); + assertThatThrownBy(() -> catalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + assertThat(catalog.getTable(newIdentifier)).isInstanceOf(FormatTable.class); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index ec485d2ebf57..cae6e6f0e367 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -768,7 +768,9 @@ public void alterTable( throw new TableNotExistException(getName(), tablePath); } - Preconditions.checkArgument(table instanceof FileStoreTable, "Can't alter system table."); + checkArgument( + table instanceof FileStoreTable, + "Only support alter data table, but is: " + table.getClass()); validateAlterTable(toCatalogTable(table), newTable); Map oldTableNonPhysicalColumnIndex = FlinkCatalogPropertiesUtil.nonPhysicalColumns( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java index dc2a0f06b648..7c3a13c6f377 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java @@ -89,6 +89,7 @@ public static FlinkGenericCatalog createCatalog( ClassLoader cl, Map optionMap, String name, Catalog flinkCatalog) { Options options = Options.fromMap(optionMap); options.set(CatalogOptions.METASTORE, "hive"); + options.set(CatalogOptions.FORMAT_TABLE_ENABLED, false); FlinkCatalog paimon = new FlinkCatalog( org.apache.paimon.catalog.CatalogFactory.createCatalog( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java index 95aff5d84796..2e944f930cbb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java @@ -20,7 +20,6 @@ import org.apache.paimon.table.FormatTable; -import org.apache.flink.connector.file.table.FileSystemTableFactory; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.connector.sink.DynamicTableSink; @@ -30,17 +29,16 @@ import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; /** A {@link CatalogTable} to represent format table. */ public class FormatCatalogTable implements CatalogTable { @@ -83,18 +81,17 @@ public CatalogTable copy(Map map) { public Map getOptions() { if (cachedOptions == null) { cachedOptions = new HashMap<>(); - FileSystemTableFactory fileSystemFactory = new FileSystemTableFactory(); - Set validOptions = new HashSet<>(); - fileSystemFactory.requiredOptions().forEach(o -> validOptions.add(o.key())); - fileSystemFactory.optionalOptions().forEach(o -> validOptions.add(o.key())); String format = table.format().name().toLowerCase(); - table.options() - .forEach( - (k, v) -> { - if (validOptions.contains(k) || k.startsWith(format + ".")) { - cachedOptions.put(k, v); - } - }); + Map options = table.options(); + options.forEach( + (k, v) -> { + if (k.startsWith(format + ".")) { + cachedOptions.put(k, v); + } + }); + if (options.containsKey(FIELD_DELIMITER.key())) { + cachedOptions.put("csv.field-delimiter", options.get(FIELD_DELIMITER.key())); + } cachedOptions.put(CONNECTOR.key(), "filesystem"); cachedOptions.put(PATH.key(), table.location()); cachedOptions.put(FORMAT.key(), format); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 9c1a2f4e3918..439cdf958f50 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -189,7 +189,8 @@ public void testCreateSystemDatabase() { public void testChangeTableInSystemDatabase() { sql("USE sys"); assertThatCode(() -> sql("ALTER TABLE all_table_options SET ('bucket-num' = '5')")) - .hasRootCauseMessage("Can't alter system table."); + .rootCause() + .hasMessageContaining("Only support alter data table, but is: "); } @Test diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index ce1607e8d341..7c9bbde95f6c 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -44,6 +44,7 @@ import org.apache.paimon.table.CatalogTableType; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.FormatTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; @@ -97,13 +98,13 @@ import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; -import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable; import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; +import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; @@ -111,6 +112,7 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; /** A catalog implementation for Hive. */ @@ -172,8 +174,8 @@ public HiveCatalog( this.clients = new CachedClientPool(hiveConf, options, clientClassName); } - private boolean formatTableEnabled() { - return options.get(FORMAT_TABLE_ENABLED); + private boolean formatTableDisabled() { + return !options.get(FORMAT_TABLE_ENABLED); } @Override @@ -607,7 +609,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) } catch (TableNotExistException ignore) { } - if (!formatTableEnabled()) { + if (formatTableDisabled()) { throw new TableNotExistException(identifier); } @@ -620,7 +622,7 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) @Override public void createFormatTable(Identifier identifier, Schema schema) { - if (!formatTableEnabled()) { + if (formatTableDisabled()) { throw new UnsupportedOperationException( "Format table is not enabled for " + identifier); } @@ -641,7 +643,7 @@ public void createFormatTable(Identifier identifier, Schema schema) { schema.comment()); try { Path location = getTableLocation(identifier, null); - Table hiveTable = createHiveTable(identifier, newSchema, location); + Table hiveTable = createHiveFormatTable(identifier, newSchema, location); clients.execute(client -> client.createTable(hiveTable)); } catch (Exception e) { // we don't need to delete directories since HMS will roll back db and fs if failed. @@ -727,12 +729,10 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Path location) { + checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != FORMAT_TABLE); + Map tblProperties; - String provider = PAIMON_TABLE_TYPE_VALUE; - if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) { - provider = tableSchema.options().get(FILE_FORMAT.key()); - } - if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) { + if (syncAllProperties()) { tblProperties = new HashMap<>(tableSchema.options()); // add primary-key, partition-key to tblproperties @@ -748,8 +748,32 @@ private Table createHiveTable(Identifier identifier, TableSchema tableSchema, Pa } } + Table table = newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE); + updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location); + return table; + } + + private Table createHiveFormatTable( + Identifier identifier, TableSchema tableSchema, Path location) { + Options options = Options.fromMap(tableSchema.options()); + checkArgument(options.get(TYPE) == FORMAT_TABLE); + + String provider = tableSchema.options().get(FILE_FORMAT.key()); + checkNotNull(provider, FILE_FORMAT.key() + " should be configured."); + // valid supported format + FormatTable.Format.valueOf(provider.toUpperCase()); + + Map tblProperties = new HashMap<>(); + Table table = newHmsTable(identifier, tblProperties, provider); updateHmsTable(table, identifier, tableSchema, provider, location); + + if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) { + table.getSd() + .getSerdeInfo() + .getParameters() + .put(FIELD_DELIM, options.get(FIELD_DELIMITER)); + } return table; } @@ -796,6 +820,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { @Override protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + Table table = getHmsTable(identifier); + if (!isPaimonTable(identifier, table)) { + throw new UnsupportedOperationException("Only data table support alter table."); + } + final SchemaManager schemaManager = schemaManager(identifier, getTableLocation(identifier)); // first commit changes to underlying files TableSchema schema = schemaManager.commitChanges(changes); @@ -805,12 +834,6 @@ protected void alterTableImpl(Identifier identifier, List changes) return; } try { - Table table = - clients.run( - client -> - client.getTable( - identifier.getDatabaseName(), - identifier.getTableName())); alterTableToHms(table, identifier, schema); } catch (Exception te) { schemaManager.deleteSchema(schema.id()); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java index c74fa447ea46..38f73bc6bd65 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java @@ -85,14 +85,5 @@ public final class HiveCatalogOptions { + "E.g. specifying \"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with different default catalog wouldn't share the same client pool. Multiple conf elements can be specified.")) .build()); - public static final ConfigOption FORMAT_TABLE_ENABLED = - ConfigOptions.key("format-table.enabled") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether to support format tables, format table corresponds to a regular Hive table, allowing read and write operations. " - + "However, during these processes, it does not connect to the metastore; hence, newly added partitions will not be reflected in" - + " the metastore and need to be manually added as separate partition operations."); - private HiveCatalogOptions() {} } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java index 0c88107c0849..fef2d395298f 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java @@ -19,7 +19,6 @@ package org.apache.paimon.hive; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.FormatTable.Format; import org.apache.paimon.types.DataType; @@ -38,9 +37,6 @@ import java.util.Map; import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; -import static org.apache.paimon.CoreOptions.FILE_FORMAT; -import static org.apache.paimon.CoreOptions.TYPE; -import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; @@ -57,32 +53,25 @@ public static FormatTable convertToFormatTable(Table hiveTable) { RowType rowType = createRowType(hiveTable); String comment = options.remove(COMMENT_PROP); String location = hiveTable.getSd().getLocation(); + Format format; SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo(); - if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) { - format = Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase()); - if (format.equals(Format.CSV)) { - options.put( - FIELD_DELIMITER.key(), - serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); - } + String serLib = serdeInfo.getSerializationLib().toLowerCase(); + String inputFormat = hiveTable.getSd().getInputFormat(); + if (serLib.contains("parquet")) { + format = Format.PARQUET; + } else if (serLib.contains("orc")) { + format = Format.ORC; + } else if (inputFormat.contains("Text")) { + format = Format.CSV; + // hive default field delimiter is '\u0001' + options.put( + FIELD_DELIMITER.key(), + serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); } else { - String serLib = serdeInfo.getSerializationLib().toLowerCase(); - String inputFormat = hiveTable.getSd().getInputFormat(); - if (serLib.contains("parquet")) { - format = Format.PARQUET; - } else if (serLib.contains("orc")) { - format = Format.ORC; - } else if (inputFormat.contains("Text")) { - format = Format.CSV; - // hive default field delimiter is '\u0001' - options.put( - FIELD_DELIMITER.key(), - serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); - } else { - throw new UnsupportedOperationException("Unsupported table: " + hiveTable); - } + throw new UnsupportedOperationException("Unsupported table: " + hiveTable); } + return FormatTable.builder() .identifier(identifier) .rowType(rowType) diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index cb902605cf90..0f1218aeb2f3 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -273,4 +273,9 @@ public void testAlterHiveTableParameters() { protected boolean supportsView() { return true; } + + @Override + protected boolean supportsFormatTable() { + return true; + } } diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java index c24e4a608021..8a4745a09022 100644 --- a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java +++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java @@ -113,7 +113,7 @@ public void testCustomConstructorMetastoreClient() throws Exception { } @Test - public void testCreateExistTableInHive() throws Exception { + public void testCreateExistTableInHive() { tEnv.executeSql( String.join( "\n", @@ -133,7 +133,6 @@ public void testCreateExistTableInHive() throws Exception { tEnv.executeSql( "CREATE TABLE hive_table(a INT, b INT, c INT, d INT)") .await()) - .isInstanceOf(TableException.class) .hasMessage( "Could not execute CreateTable in path `my_hive_custom_client`.`test_db`.`hive_table`"); assertThat( diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java index 1a77723e948e..48d41d27e8d8 100644 --- a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java @@ -133,7 +133,6 @@ public void testCreateExistTableInHive() throws Exception { tEnv.executeSql( "CREATE TABLE hive_table(a INT, b INT, c INT, d INT)") .await()) - .isInstanceOf(TableException.class) .hasMessage( "Could not execute CreateTable in path `my_hive_custom_client`.`test_db`.`hive_table`"); assertThat( diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java index cd748c9ac72f..fc58ad59527c 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java @@ -44,7 +44,7 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED; +import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for using Paimon {@link HiveCatalog}. */ @@ -147,7 +147,7 @@ public void testFlinkCreateCsvFormatTable() throws Exception { @Test public void testFlinkCreateFormatTableWithDelimiter() throws Exception { tEnv.executeSql( - "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')"); + "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'field-delimiter'=';')"); doTestFormatTable("flink_csv_table_delimiter"); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 37601f4f8c51..74d2d7e1c343 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -39,7 +39,6 @@ import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.ExecutionConfigOptions; @@ -275,7 +274,8 @@ public void testTableOperations() throws Exception { .await(); tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") .await(); - assertThat(collect("SHOW TABLES")).isEqualTo(Arrays.asList(Row.of("s"), Row.of("t"))); + assertThat(collect("SHOW TABLES")) + .containsExactlyInAnyOrder(Row.of("s"), Row.of("t"), Row.of("hive_table")); tEnv.executeSql( "CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") @@ -294,17 +294,14 @@ public void testTableOperations() throws Exception { Path tablePath = new Path(path, "test_db.db/s"); assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue(); tEnv.executeSql("DROP TABLE s").await(); - assertThat(collect("SHOW TABLES")).isEqualTo(Collections.singletonList(Row.of("t"))); + assertThat(collect("SHOW TABLES")) + .containsExactlyInAnyOrder(Row.of("t"), Row.of("hive_table")); assertThat(tablePath.getFileSystem().exists(tablePath)).isFalse(); tEnv.executeSql("DROP TABLE IF EXISTS s").await(); assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE s").await()) .isInstanceOf(ValidationException.class) .hasMessage("Table with identifier 'my_hive.test_db.s' does not exist."); - assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE hive_table").await()) - .isInstanceOf(ValidationException.class) - .hasMessage("Table with identifier 'my_hive.test_db.hive_table' does not exist."); - // alter table tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = '16MB' )").await(); List actual = collect("SHOW CREATE TABLE t"); @@ -329,9 +326,9 @@ public void testTableOperations() throws Exception { tEnv.executeSql( "ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )") .await()) - .isInstanceOf(RuntimeException.class) + .rootCause() .hasMessage( - "Table `my_hive`.`test_db`.`hive_table` doesn't exist or is a temporary table."); + "Only support alter data table, but is: class org.apache.paimon.table.FormatTable$FormatTableImpl"); } @Test @@ -656,15 +653,6 @@ public void testFlinkWriteAndHiveRead() throws Exception { Arrays.asList( "true\t1\t1\t1\t1234567890123456789\t1.23\t3.14159\t1234.56\tABC\tv1\tHello, World!\t01\t010203\t2023-01-01\t2023-01-01 12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t1", "false\t2\t2\t2\t234567890123456789\t2.34\t2.111111\t2345.67\tDEF\tv2\tApache Paimon\t04\t040506\t2023-02-01\t2023-02-01 12:00:00.456\t[\"value4\",\"value5\",\"value6\"]\tvalue4\tvalue11\tvalue22\t{\"f0\":\"v2\",\"f1\":2}\tv2\t2")); - - assertThatThrownBy( - () -> - tEnv.executeSql( - "INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')") - .await()) - .isInstanceOf(TableException.class) - .hasMessage( - "Cannot find table '`my_hive`.`test_db`.`hive_table`' in any of the catalogs [default_catalog, my_hive], nor as a temporary table."); } @Test diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 2e4a2eaec4ae..b500da8f19f2 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -28,6 +28,7 @@ import org.apache.paimon.spark.catalog.SparkBaseCatalog; import org.apache.paimon.spark.catalog.SupportFunction; import org.apache.paimon.table.FormatTable; +import org.apache.paimon.table.FormatTableOptions; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; @@ -511,8 +512,11 @@ private static FileTable convertToFileTable(Identifier ident, FormatTable format StructType schema = SparkTypeUtils.fromPaimonRowType(formatTable.rowType()); List pathList = new ArrayList<>(); pathList.add(formatTable.location()); - CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(formatTable.options()); + Options options = Options.fromMap(formatTable.options()); + CaseInsensitiveStringMap dsOptions = new CaseInsensitiveStringMap(options.toMap()); if (formatTable.format() == FormatTable.Format.CSV) { + options.set("sep", options.get(FormatTableOptions.FIELD_DELIMITER)); + dsOptions = new CaseInsensitiveStringMap(options.toMap()); return new CSVTable( ident.name(), SparkSession.active(), diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 12407f2614ff..4741bfd000dc 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -317,6 +317,7 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo options.put(METASTORE.key(), metastore); } } + options.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false"); String sessionCatalogDefaultDatabase = SQLConfUtils.defaultDatabase(sqlConf); if (options.containsKey(DEFAULT_DATABASE.key())) { String userDefineDefaultDatabase = options.get(DEFAULT_DATABASE.key()); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java index 04159b94a938..d42e84b9cfdd 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.hive.TestHiveMetastore; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -28,12 +29,14 @@ import org.junit.jupiter.api.io.TempDir; import java.io.FileNotFoundException; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Base tests for spark read. */ public class SparkCatalogWithHiveTest { + private static TestHiveMetastore testHiveMetastore; private static final int PORT = 9087; @@ -51,7 +54,6 @@ public static void closeMetastore() throws Exception { @Test public void testCreateFormatTable(@TempDir java.nio.file.Path tempDir) { - // firstly, we use hive metastore to create table, and check the result. Path warehousePath = new Path("file:" + tempDir.toString()); SparkSession spark = SparkSession.builder() @@ -73,6 +75,9 @@ public void testCreateFormatTable(@TempDir java.nio.file.Path tempDir) { spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); spark.sql("USE spark_catalog.my_db1"); + + // test orc table + spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c STRING) USING orc"); assertThat( @@ -80,35 +85,30 @@ public void testCreateFormatTable(@TempDir java.nio.file.Path tempDir) { .map(s -> s.get(1)) .map(Object::toString)) .containsExactlyInAnyOrder("table_orc"); - spark.close(); - SparkSession spark1 = - SparkSession.builder() - .config("spark.sql.warehouse.dir", warehousePath.toString()) - // with hive metastore - .config("spark.sql.catalogImplementation", "hive") - .config("hive.metastore.uris", "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) - .config("spark.sql.catalog.spark_catalog.metastore", "hive") - .config( - "spark.sql.catalog.spark_catalog.hive.metastore.uris", - "thrift://localhost:" + PORT) - .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") - .config( - "spark.sql.catalog.spark_catalog.warehouse", - warehousePath.toString()) - .master("local[2]") - .getOrCreate(); - spark1.sql("USE spark_catalog.my_db1"); assertThat( - spark1.sql("EXPLAIN EXTENDED SELECT * from table_orc").collectAsList() + spark.sql("EXPLAIN EXTENDED SELECT * from table_orc").collectAsList() .stream() .map(s -> s.get(0)) .map(Object::toString) .filter(s -> s.contains("OrcScan")) .count()) .isGreaterThan(0); - spark1.close(); + + // test csv table + + spark.sql( + "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c STRING) USING csv OPTIONS ('field-delimiter' ';')"); + spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, '2')").collect(); + assertThat(spark.sql("DESCRIBE FORMATTED table_csv").collectAsList().toString()) + .contains("sep=;"); + assertThat( + spark.sql("SELECT * FROM table_csv").collectAsList().stream() + .map(Row::toString) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]"); + + spark.close(); } @Test