Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Oct 22, 2024
1 parent daa2536 commit fc28544
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public static FormatTable convertToFormatTable(Table hiveTable) {
if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) {
format = Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase());
if (format.equals(Format.CSV)) {
options.put(FIELD_DELIMITER.key(), serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
options.put(
FIELD_DELIMITER.key(),
serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
}
} else {
String serLib = serdeInfo.getSerializationLib().toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,23 @@ public void testPartitionTable() throws Exception {

@Test
public void testFlinkCreateCsvFormatTable() throws Exception {
tEnv.executeSql("CREATE TABLE flink_csv_table (a INT, b STRING) with ('type'='format-table', 'file.format'='csv')").await();
tEnv.executeSql(
"CREATE TABLE flink_csv_table (a INT, b STRING) with ('type'='format-table', 'file.format'='csv')")
.await();
doTestFormatTable("flink_csv_table");
}

@Test
public void testFlinkCreateFormatTableWithDelimiter() throws Exception {
tEnv.executeSql("CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')");
tEnv.executeSql(
"CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with ('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')");
doTestFormatTable("flink_csv_table_delimiter");
}

@Test
public void testFlinkCreatePartitionTable() throws Exception {
tEnv.executeSql("CREATE TABLE flink_partition_table (a INT,b STRING) PARTITIONED BY (b) with ('type'='format-table', 'file.format'='csv')");
tEnv.executeSql(
"CREATE TABLE flink_partition_table (a INT,b STRING) PARTITIONED BY (b) with ('type'='format-table', 'file.format'='csv')");
doTestFormatTable("flink_partition_table");
}

Expand Down

0 comments on commit fc28544

Please sign in to comment.