From daa25367a0245c6e57451f533d6a860f232cafaf Mon Sep 17 00:00:00 2001 From: herefree <841043203@qq.com> Date: Tue, 22 Oct 2024 16:15:01 +0800 Subject: [PATCH] support create formatTable set csv.field-delimiter --- .../org/apache/paimon/hive/HiveCatalog.java | 26 +++++++++++++------ .../paimon/hive/HiveFormatTableUtils.java | 6 +++-- .../HiveCatalogFormatTableITCaseBase.java | 18 +++++++++++++ 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 4d9c482a20da..92bfaee3f847 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -85,6 +85,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; +import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; @@ -99,6 +100,7 @@ import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; +import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -600,11 +602,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) { private Table createHiveTable(Identifier identifier, TableSchema tableSchema) { Map tblProperties; - String provider = "paimon"; + String provider = PAIMON_TABLE_TYPE_VALUE; if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) { provider = tableSchema.options().get(FILE_FORMAT.key()); } - if (syncAllProperties() || !provider.equals("paimon")) { + if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) { tblProperties = new HashMap<>(tableSchema.options()); // add primary-key, partition-key to tblproperties @@ -812,7 +814,7 @@ private Table newHmsTable( hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()), CatalogTableType.class); if (provider == null) { - provider = "paimon"; + provider = PAIMON_TABLE_TYPE_VALUE; } Table table = new Table( @@ -830,7 +832,7 @@ private Table newHmsTable( null, tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE"); table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase()); - if ("paimon".equalsIgnoreCase(provider)) { + if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) { table.getParameters() .put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME); } else { @@ -844,7 +846,7 @@ private Table newHmsTable( } private String getSerdeClassName(String provider) { - if (provider == null || provider.equalsIgnoreCase("paimon")) { + if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { return SERDE_CLASS_NAME; } else if (provider.equalsIgnoreCase("csv")) { return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; @@ -858,7 +860,7 @@ private String getSerdeClassName(String provider) { } private String getInputFormatName(String provider) { - if (provider == null || provider.equalsIgnoreCase("paimon")) { + if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { return INPUT_FORMAT_CLASS_NAME; } else if (provider.equalsIgnoreCase("csv")) { return "org.apache.hadoop.mapred.TextInputFormat"; @@ -872,7 +874,7 @@ private String getInputFormatName(String provider) { } private String getOutputFormatClassName(String provider) { - if (provider == null || provider.equalsIgnoreCase("paimon")) { + if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { return OUTPUT_FORMAT_CLASS_NAME; } else if (provider.equalsIgnoreCase("csv")) { return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; @@ -893,7 +895,7 @@ private void updateHmsTable( sd.setOutputFormat(getOutputFormatClassName(provider)); SerDeInfo serDeInfo = sd.getSerdeInfo() != null ? sd.getSerdeInfo() : new SerDeInfo(); - serDeInfo.setParameters(new HashMap<>()); + serDeInfo.setParameters(setSerDeInfoParam(provider)); serDeInfo.setSerializationLib(getSerdeClassName(provider)); sd.setSerdeInfo(serDeInfo); @@ -946,6 +948,14 @@ private void updateHmsTable( locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString()); } + private Map setSerDeInfoParam(String provider) { + Map param = new HashMap<>(); + if (provider.equalsIgnoreCase("csv")) { + param.put(FIELD_DELIM, options.get(FIELD_DELIMITER)); + } + return param; + } + private void updateHmsTablePars(Table table, TableSchema schema) { if (syncAllProperties()) { table.getParameters().putAll(schema.options()); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java index 6ec9d3863700..5b166922a334 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java @@ -58,11 +58,13 @@ public static FormatTable convertToFormatTable(Table hiveTable) { String comment = options.remove(COMMENT_PROP); String location = hiveTable.getSd().getLocation(); Format format; + SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo(); if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) { format = Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase()); - // field delimiter for csv leaves untouched + if (format.equals(Format.CSV)) { + options.put(FIELD_DELIMITER.key(), serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); + } } else { - SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo(); String serLib = serdeInfo.getSerializationLib().toLowerCase(); String inputFormat = hiveTable.getSd().getInputFormat(); if (serLib.contains("parquet")) { diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java index 9a7ff1e586a1..7f5e5962f9e7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java @@ -136,6 +136,24 @@ public void testPartitionTable() throws Exception { doTestFormatTable("partition_table"); } + @Test + public void testFlinkCreateCsvFormatTable() throws Exception { + tEnv.executeSql("CREATE TABLE flink_csv_table (a INT, b STRING) with ('type'='format-table', 'file.format'='csv')").await(); + doTestFormatTable("flink_csv_table"); + } + + @Test + public void testFlinkCreateFormatTableWithDelimiter() throws Exception { + tEnv.executeSql("CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')"); + doTestFormatTable("flink_csv_table_delimiter"); + } + + @Test + public void testFlinkCreatePartitionTable() throws Exception { + tEnv.executeSql("CREATE TABLE flink_partition_table (a INT,b STRING) PARTITIONED BY (b) with ('type'='format-table', 'file.format'='csv')"); + doTestFormatTable("flink_partition_table"); + } + private void doTestFormatTable(String tableName) throws Exception { hiveShell.execute( String.format("INSERT INTO %s VALUES (100, 'Hive'), (200, 'Table')", tableName));