Skip to content

Commit

Permalink
support create formatTable set csv.field-delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Oct 22, 2024
1 parent f427fca commit daa2536
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
Expand All @@ -99,6 +100,7 @@
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -600,11 +602,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) {

private Table createHiveTable(Identifier identifier, TableSchema tableSchema) {
Map<String, String> tblProperties;
String provider = "paimon";
String provider = PAIMON_TABLE_TYPE_VALUE;
if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) {
provider = tableSchema.options().get(FILE_FORMAT.key());
}
if (syncAllProperties() || !provider.equals("paimon")) {
if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) {
tblProperties = new HashMap<>(tableSchema.options());

// add primary-key, partition-key to tblproperties
Expand Down Expand Up @@ -812,7 +814,7 @@ private Table newHmsTable(
hiveConf.get(TABLE_TYPE.key(), CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
if (provider == null) {
provider = "paimon";
provider = PAIMON_TABLE_TYPE_VALUE;
}
Table table =
new Table(
Expand All @@ -830,7 +832,7 @@ private Table newHmsTable(
null,
tableType.toString().toUpperCase(Locale.ROOT) + "_TABLE");
table.getParameters().put(TABLE_TYPE_PROP, provider.toUpperCase());
if ("paimon".equalsIgnoreCase(provider)) {
if (PAIMON_TABLE_TYPE_VALUE.equalsIgnoreCase(provider)) {
table.getParameters()
.put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME);
} else {
Expand All @@ -844,7 +846,7 @@ private Table newHmsTable(
}

private String getSerdeClassName(String provider) {
if (provider == null || provider.equalsIgnoreCase("paimon")) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return SERDE_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
Expand All @@ -858,7 +860,7 @@ private String getSerdeClassName(String provider) {
}

private String getInputFormatName(String provider) {
if (provider == null || provider.equalsIgnoreCase("paimon")) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return INPUT_FORMAT_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.mapred.TextInputFormat";
Expand All @@ -872,7 +874,7 @@ private String getInputFormatName(String provider) {
}

private String getOutputFormatClassName(String provider) {
if (provider == null || provider.equalsIgnoreCase("paimon")) {
if (provider == null || provider.equalsIgnoreCase(PAIMON_TABLE_TYPE_VALUE)) {
return OUTPUT_FORMAT_CLASS_NAME;
} else if (provider.equalsIgnoreCase("csv")) {
return "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
Expand All @@ -893,7 +895,7 @@ private void updateHmsTable(
sd.setOutputFormat(getOutputFormatClassName(provider));

SerDeInfo serDeInfo = sd.getSerdeInfo() != null ? sd.getSerdeInfo() : new SerDeInfo();
serDeInfo.setParameters(new HashMap<>());
serDeInfo.setParameters(setSerDeInfoParam(provider));
serDeInfo.setSerializationLib(getSerdeClassName(provider));
sd.setSerdeInfo(serDeInfo);

Expand Down Expand Up @@ -946,6 +948,14 @@ private void updateHmsTable(
locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString());
}

private Map<String, String> setSerDeInfoParam(String provider) {
Map<String, String> param = new HashMap<>();
if (provider.equalsIgnoreCase("csv")) {
param.put(FIELD_DELIM, options.get(FIELD_DELIMITER));
}
return param;
}

private void updateHmsTablePars(Table table, TableSchema schema) {
if (syncAllProperties()) {
table.getParameters().putAll(schema.options());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ public static FormatTable convertToFormatTable(Table hiveTable) {
String comment = options.remove(COMMENT_PROP);
String location = hiveTable.getSd().getLocation();
Format format;
SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo();
if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) {
format = Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase());
// field delimiter for csv leaves untouched
if (format.equals(Format.CSV)) {
options.put(FIELD_DELIMITER.key(), serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
}
} else {
SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo();
String serLib = serdeInfo.getSerializationLib().toLowerCase();
String inputFormat = hiveTable.getSd().getInputFormat();
if (serLib.contains("parquet")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ public void testPartitionTable() throws Exception {
doTestFormatTable("partition_table");
}

@Test
public void testFlinkCreateCsvFormatTable() throws Exception {
tEnv.executeSql("CREATE TABLE flink_csv_table (a INT, b STRING) with ('type'='format-table', 'file.format'='csv')").await();
doTestFormatTable("flink_csv_table");
}

@Test
public void testFlinkCreateFormatTableWithDelimiter() throws Exception {
tEnv.executeSql("CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')");
doTestFormatTable("flink_csv_table_delimiter");
}

@Test
public void testFlinkCreatePartitionTable() throws Exception {
tEnv.executeSql("CREATE TABLE flink_partition_table (a INT,b STRING) PARTITIONED BY (b) with ('type'='format-table', 'file.format'='csv')");
doTestFormatTable("flink_partition_table");
}

private void doTestFormatTable(String tableName) throws Exception {
hiveShell.execute(
String.format("INSERT INTO %s VALUES (100, 'Hive'), (200, 'Table')", tableName));
Expand Down

0 comments on commit daa2536

Please sign in to comment.