From 253378850431ca97687847199e900d908e04e6df Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 25 Nov 2024 13:31:25 +0800 Subject: [PATCH] [hive] Improve paimon format table conversion hive table in hive catalog. (#4522) --- .../org/apache/paimon/table/FormatTable.java | 14 ++ .../org/apache/paimon/hive/HiveCatalog.java | 124 +++++++++--------- .../apache/paimon/hive/HiveCatalogTest.java | 4 +- 3 files changed, 76 insertions(+), 66 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index a53ba545c25e..a4c7788c38af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -70,6 +71,19 @@ enum Format { CSV } + /** Parses a file format string to a corresponding {@link Format} enum constant. */ + static Format parseFormat(String fileFormat) { + try { + return Format.valueOf(fileFormat.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new UnsupportedOperationException( + "Format table unsupported file format: " + + fileFormat + + ". Supported formats: " + + Arrays.toString(Format.values())); + } + } + /** Create a new builder for {@link FormatTable}. */ static FormatTable.Builder builder() { return new FormatTable.Builder(); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index ebd5a1edf89b..0ecc78469e15 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -112,7 +112,6 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; /** A catalog implementation for Hive. */ @@ -122,7 +121,7 @@ public class HiveCatalog extends AbstractCatalog { // Reserved properties public static final String TABLE_TYPE_PROP = "table_type"; - public static final String PAIMON_TABLE_TYPE_VALUE = "paimon"; + public static final String PAIMON_TABLE_IDENTIFIER = "PAIMON"; // we don't include paimon-hive-connector as dependencies because it depends on // hive-exec @@ -766,33 +765,24 @@ private Table createHiveTable( } } - Table table = - newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable); - updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location); + Table table = newHmsTable(identifier, tblProperties, null, externalTable); + updateHmsTable(table, identifier, tableSchema, null, location); return table; } private Table createHiveFormatTable( Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) { - Options options = Options.fromMap(tableSchema.options()); - checkArgument(options.get(TYPE) == FORMAT_TABLE); + CoreOptions coreOptions = new CoreOptions(tableSchema.options()); + checkArgument(coreOptions.type() == FORMAT_TABLE); - String provider = tableSchema.options().get(FILE_FORMAT.key()); - checkNotNull(provider, FILE_FORMAT.key() + " should be configured."); - // valid supported format - FormatTable.Format.valueOf(provider.toUpperCase()); + // file.format option has a default value and cannot be empty. + FormatTable.Format provider = FormatTable.parseFormat(coreOptions.formatType()); Map tblProperties = new HashMap<>(); Table table = newHmsTable(identifier, tblProperties, provider, externalTable); updateHmsTable(table, identifier, tableSchema, provider, location); - if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) { - table.getSd() - .getSerdeInfo() - .getParameters() - .put(FIELD_DELIM, options.get(FIELD_DELIMITER)); - } return table; } @@ -879,7 +869,8 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new throws TException, InterruptedException { updateHmsTablePars(table, newSchema); Path location = getTableLocation(identifier, table); - updateHmsTable(table, identifier, newSchema, newSchema.options().get("provider"), location); + // file format is null, because only data table support alter table. + updateHmsTable(table, identifier, newSchema, null, location); clients.execute(client -> HiveAlterTableUtils.alterTable(client, identifier, table)); } @@ -1059,12 +1050,9 @@ private boolean isExternalTable(Table table) { private Table newHmsTable( Identifier identifier, Map tableParameters, - String provider, + @Nullable FormatTable.Format provider, boolean externalTable) { long currentTimeMillis = System.currentTimeMillis(); - if (provider == null) { - provider = PAIMON_TABLE_TYPE_VALUE; - } Table table = new Table( identifier.getTableName(), @@ -1082,67 +1070,83 @@ private Table newHmsTable( externalTable ? TableType.EXTERNAL_TABLE.name() : TableType.MANAGED_TABLE.name()); - table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase()); - if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) { + + if (provider == null) { + // normal paimon table + table.getParameters().put(TABLE_TYPE_PROP, PAIMON_TABLE_IDENTIFIER); table.getParameters() .put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME); } else { - table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase()); + // format table + table.getParameters().put(TABLE_TYPE_PROP, provider.name()); + table.getParameters().put(FILE_FORMAT.key(), provider.name().toLowerCase()); table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString()); } + if (externalTable) { table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE"); } return table; } - private String getSerdeClassName(String provider) { - if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { - return SERDE_CLASS_NAME; - } else if (provider.equalsIgnoreCase("csv")) { - return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; - } else if (provider.equalsIgnoreCase("parquet")) { - return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; - } else if (provider.equalsIgnoreCase("orc")) { - return "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; - } else { + private String getSerdeClassName(@Nullable FormatTable.Format provider) { + if (provider == null) { return SERDE_CLASS_NAME; } + switch (provider) { + case CSV: + return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + case PARQUET: + return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + case ORC: + return "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + } + return SERDE_CLASS_NAME; } - private String getInputFormatName(String provider) { - if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { - return INPUT_FORMAT_CLASS_NAME; - } else if (provider.equalsIgnoreCase("csv")) { - return "org.apache.hadoop.mapred.TextInputFormat"; - } else if (provider.equalsIgnoreCase("parquet")) { - return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; - } else if (provider.equalsIgnoreCase("orc")) { - return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - } else { + private String getInputFormatName(@Nullable FormatTable.Format provider) { + if (provider == null) { return INPUT_FORMAT_CLASS_NAME; } + switch (provider) { + case CSV: + return "org.apache.hadoop.mapred.TextInputFormat"; + case PARQUET: + return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + case ORC: + return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + } + return INPUT_FORMAT_CLASS_NAME; } - private String getOutputFormatClassName(String provider) { - if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) { - return OUTPUT_FORMAT_CLASS_NAME; - } else if (provider.equalsIgnoreCase("csv")) { - return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; - } else if (provider.equalsIgnoreCase("parquet")) { - return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; - } else if (provider.equalsIgnoreCase("orc")) { - return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - } else { + private String getOutputFormatClassName(@Nullable FormatTable.Format provider) { + if (provider == null) { return OUTPUT_FORMAT_CLASS_NAME; } + switch (provider) { + case CSV: + return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"; + case PARQUET: + return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; + case ORC: + return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; + } + return OUTPUT_FORMAT_CLASS_NAME; + } + + private Map setSerDeInfoParam(@Nullable FormatTable.Format provider) { + Map param = new HashMap<>(); + if (provider == FormatTable.Format.CSV) { + param.put(FIELD_DELIM, options.get(FIELD_DELIMITER)); + } + return param; } private void updateHmsTable( Table table, Identifier identifier, TableSchema schema, - String provider, + @Nullable FormatTable.Format provider, Path location) { StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor(); @@ -1206,14 +1210,6 @@ private void updateHmsTable( locationHelper.specifyTableLocation(table, location.toString()); } - private Map setSerDeInfoParam(String provider) { - Map param = new HashMap<>(); - if (provider != null && provider.equalsIgnoreCase("csv")) { - param.put(FIELD_DELIM, options.get(FIELD_DELIMITER)); - } - return param; - } - private void updateHmsTablePars(Table table, TableSchema schema) { if (syncAllProperties()) { table.getParameters().putAll(schema.options()); diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 3ba3f89e412f..267bdf0c7100 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -55,7 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY; -import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_TYPE_VALUE; +import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER; import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -218,7 +218,7 @@ public void testAddHiveTableParameters() { assertThat(tableProperties).containsEntry("comment", "this is a hive table"); assertThat(tableProperties) .containsEntry( - TABLE_TYPE_PROP, PAIMON_TABLE_TYPE_VALUE.toUpperCase(Locale.ROOT)); + TABLE_TYPE_PROP, PAIMON_TABLE_IDENTIFIER.toUpperCase(Locale.ROOT)); } catch (Exception e) { fail("Test failed due to exception: " + e.getMessage()); }