Skip to content

Commit

Permalink
[hive] Improve paimon format table conversion hive table in hive cata…
Browse files Browse the repository at this point in the history
…log. (#4522)
  • Loading branch information
zhuangchong authored Nov 25, 2024
1 parent be24886 commit 2533788
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 66 deletions.
14 changes: 14 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -70,6 +71,19 @@ enum Format {
CSV
}

/** Parses a file format string to a corresponding {@link Format} enum constant. */
static Format parseFormat(String fileFormat) {
try {
return Format.valueOf(fileFormat.toUpperCase());
} catch (IllegalArgumentException e) {
throw new UnsupportedOperationException(
"Format table unsupported file format: "
+ fileFormat
+ ". Supported formats: "
+ Arrays.toString(Format.values()));
}
}

/** Create a new builder for {@link FormatTable}. */
static FormatTable.Builder builder() {
return new FormatTable.Builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;

/** A catalog implementation for Hive. */
Expand All @@ -122,7 +121,7 @@ public class HiveCatalog extends AbstractCatalog {

// Reserved properties
public static final String TABLE_TYPE_PROP = "table_type";
public static final String PAIMON_TABLE_TYPE_VALUE = "paimon";
public static final String PAIMON_TABLE_IDENTIFIER = "PAIMON";

// we don't include paimon-hive-connector as dependencies because it depends on
// hive-exec
Expand Down Expand Up @@ -766,33 +765,24 @@ private Table createHiveTable(
}
}

Table table =
newHmsTable(identifier, tblProperties, PAIMON_TABLE_TYPE_VALUE, externalTable);
updateHmsTable(table, identifier, tableSchema, PAIMON_TABLE_TYPE_VALUE, location);
Table table = newHmsTable(identifier, tblProperties, null, externalTable);
updateHmsTable(table, identifier, tableSchema, null, location);
return table;
}

private Table createHiveFormatTable(
Identifier identifier, TableSchema tableSchema, Path location, boolean externalTable) {
Options options = Options.fromMap(tableSchema.options());
checkArgument(options.get(TYPE) == FORMAT_TABLE);
CoreOptions coreOptions = new CoreOptions(tableSchema.options());
checkArgument(coreOptions.type() == FORMAT_TABLE);

String provider = tableSchema.options().get(FILE_FORMAT.key());
checkNotNull(provider, FILE_FORMAT.key() + " should be configured.");
// valid supported format
FormatTable.Format.valueOf(provider.toUpperCase());
// file.format option has a default value and cannot be empty.
FormatTable.Format provider = FormatTable.parseFormat(coreOptions.formatType());

Map<String, String> tblProperties = new HashMap<>();

Table table = newHmsTable(identifier, tblProperties, provider, externalTable);
updateHmsTable(table, identifier, tableSchema, provider, location);

if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
table.getSd()
.getSerdeInfo()
.getParameters()
.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
}
return table;
}

Expand Down Expand Up @@ -879,7 +869,8 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new
throws TException, InterruptedException {
updateHmsTablePars(table, newSchema);
Path location = getTableLocation(identifier, table);
updateHmsTable(table, identifier, newSchema, newSchema.options().get("provider"), location);
// file format is null, because only data table support alter table.
updateHmsTable(table, identifier, newSchema, null, location);
clients.execute(client -> HiveAlterTableUtils.alterTable(client, identifier, table));
}

Expand Down Expand Up @@ -1059,12 +1050,9 @@ private boolean isExternalTable(Table table) {
private Table newHmsTable(
Identifier identifier,
Map<String, String> tableParameters,
String provider,
@Nullable FormatTable.Format provider,
boolean externalTable) {
long currentTimeMillis = System.currentTimeMillis();
if (provider == null) {
provider = PAIMON_TABLE_TYPE_VALUE;
}
Table table =
new Table(
identifier.getTableName(),
Expand All @@ -1082,67 +1070,83 @@ private Table newHmsTable(
externalTable
? TableType.EXTERNAL_TABLE.name()
: TableType.MANAGED_TABLE.name());
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {

if (provider == null) {
// normal paimon table
table.getParameters().put(TABLE_TYPE_PROP, PAIMON_TABLE_IDENTIFIER);
table.getParameters()
.put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME);
} else {
table.getParameters().put(FILE_FORMAT.key(), provider.toLowerCase());
// format table
table.getParameters().put(TABLE_TYPE_PROP, provider.name());
table.getParameters().put(FILE_FORMAT.key(), provider.name().toLowerCase());
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}

if (externalTable) {
table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE");
}
return table;
}

private String getSerdeClassName(String provider) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return SERDE_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
} else if (provider.equalsIgnoreCase("parquet")) {
return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
} else if (provider.equalsIgnoreCase("orc")) {
return "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
} else {
private String getSerdeClassName(@Nullable FormatTable.Format provider) {
if (provider == null) {
return SERDE_CLASS_NAME;
}
switch (provider) {
case CSV:
return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
case PARQUET:
return "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
case ORC:
return "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
}
return SERDE_CLASS_NAME;
}

private String getInputFormatName(String provider) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return INPUT_FORMAT_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.mapred.TextInputFormat";
} else if (provider.equalsIgnoreCase("parquet")) {
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
} else if (provider.equalsIgnoreCase("orc")) {
return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
} else {
private String getInputFormatName(@Nullable FormatTable.Format provider) {
if (provider == null) {
return INPUT_FORMAT_CLASS_NAME;
}
switch (provider) {
case CSV:
return "org.apache.hadoop.mapred.TextInputFormat";
case PARQUET:
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
case ORC:
return "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
}
return INPUT_FORMAT_CLASS_NAME;
}

private String getOutputFormatClassName(String provider) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return OUTPUT_FORMAT_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
} else if (provider.equalsIgnoreCase("parquet")) {
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
} else if (provider.equalsIgnoreCase("orc")) {
return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
} else {
private String getOutputFormatClassName(@Nullable FormatTable.Format provider) {
if (provider == null) {
return OUTPUT_FORMAT_CLASS_NAME;
}
switch (provider) {
case CSV:
return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
case PARQUET:
return "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
case ORC:
return "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
}
return OUTPUT_FORMAT_CLASS_NAME;
}

private Map<String, String> setSerDeInfoParam(@Nullable FormatTable.Format provider) {
Map<String, String> param = new HashMap<>();
if (provider == FormatTable.Format.CSV) {
param.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
}
return param;
}

private void updateHmsTable(
Table table,
Identifier identifier,
TableSchema schema,
String provider,
@Nullable FormatTable.Format provider,
Path location) {
StorageDescriptor sd = table.getSd() != null ? table.getSd() : new StorageDescriptor();

Expand Down Expand Up @@ -1206,14 +1210,6 @@ private void updateHmsTable(
locationHelper.specifyTableLocation(table, location.toString());
}

private Map<String, String> setSerDeInfoParam(String provider) {
Map<String, String> param = new HashMap<>();
if (provider != null && provider.equalsIgnoreCase("csv")) {
param.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
}
return param;
}

private void updateHmsTablePars(Table table, TableSchema schema) {
if (syncAllProperties()) {
table.getParameters().putAll(schema.options());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_TYPE_VALUE;
import static org.apache.paimon.hive.HiveCatalog.PAIMON_TABLE_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalog.TABLE_TYPE_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testAddHiveTableParameters() {
assertThat(tableProperties).containsEntry("comment", "this is a hive table");
assertThat(tableProperties)
.containsEntry(
TABLE_TYPE_PROP, PAIMON_TABLE_TYPE_VALUE.toUpperCase(Locale.ROOT));
TABLE_TYPE_PROP, PAIMON_TABLE_IDENTIFIER.toUpperCase(Locale.ROOT));
} catch (Exception e) {
fail("Test failed due to exception: " + e.getMessage());
}
Expand Down

0 comments on commit 2533788

Please sign in to comment.