Skip to content

Commit

Permalink
[core] Fix migrate hive partitioned table with null partition (#3445)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored May 31, 2024
1 parent 29de6dd commit 3fa7f57
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,22 @@ private static DataFileMeta constructFileMeta(
public static BinaryRow writePartitionValue(
RowType partitionRowType,
Map<String, String> partitionValues,
List<BinaryWriter.ValueSetter> valueSetters) {
List<BinaryWriter.ValueSetter> valueSetters,
String partitionDefaultName) {

BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);

List<DataField> fields = partitionRowType.getFields();

for (int i = 0; i < fields.size(); i++) {
Object value =
TypeUtils.castFromString(
partitionValues.get(fields.get(i).name()), fields.get(i).type());
valueSetters.get(i).setValue(binaryRowWriter, i, value);
String partitionName = partitionValues.get(fields.get(i).name());
if (partitionName.equals(partitionDefaultName)) {
binaryRowWriter.setNullAt(i);
} else {
Object value = TypeUtils.castFromString(partitionName, fields.get(i).type());
valueSetters.get(i).setValue(binaryRowWriter, i, value);
}
}
binaryRowWriter.complete();
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {
Expand All @@ -77,7 +78,7 @@ public class HiveMigrator implements Migrator {
private final String sourceTable;
private final String targetDatabase;
private final String targetTable;
private final Map<String, String> options;
private final CoreOptions coreOptions;
private Boolean delete = true;

public HiveMigrator(
Expand All @@ -94,7 +95,7 @@ public HiveMigrator(
this.sourceTable = sourceTable;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
this.options = options;
this.coreOptions = new CoreOptions(options);
}

public static List<Migrator> databaseMigrators(
Expand Down Expand Up @@ -250,8 +251,10 @@ public Schema from(
List<FieldSchema> fields,
List<FieldSchema> partitionFields,
Map<String, String> hiveTableOptions) {
HashMap<String, String> paimonOptions = new HashMap<>(this.options);
paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
checkArgument(
coreOptions.bucket() == -1,
"Hive migrator only support unaware-bucket target table, bucket should be -1");
Map<String, String> paimonOptions = coreOptions.toMap();
// for compatible with hive comment system
if (hiveTableOptions.get("comment") != null) {
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
Expand Down Expand Up @@ -302,7 +305,11 @@ private List<MigrateTask> importPartitionedTableTask(
String format = parseFormat(partition.getSd().getSerdeInfo().toString());
String location = partition.getSd().getLocation();
BinaryRow partitionRow =
FileMetaUtils.writePartitionValue(partitionRowType, values, valueSetters);
FileMetaUtils.writePartitionValue(
partitionRowType,
values,
valueSetters,
coreOptions.partitionDefaultName());
Path path = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);

migrateTasks.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public InternalRow[] call(InternalRow args) {
paimonCatalog.renameTable(tmpTableId, sourceTableId, false);
}
} catch (Exception e) {
throw new RuntimeException("Call migrate_table error", e);
throw new RuntimeException("Call migrate_table error: " + e.getMessage(), e);
}

return new InternalRow[] {newInternalRow(true)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonHiveTestBase

import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.assertThatThrownBy

class MigrateTableProcedureTest extends PaimonHiveTestBase {
Seq("parquet", "orc", "avro").foreach(
Expand Down Expand Up @@ -93,4 +94,51 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
}
})

test(s"Paimon migrate table procedure: migrate partitioned table with null partition") {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
|USING parquet
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1), ('2', 'b', null)")

checkAnswer(
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", 1) :: Row("2", "b", null) :: Nil)

spark.sql(
s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl',
|options => 'file.format=parquet,partition.default-name=__HIVE_DEFAULT_PARTITION__')
|""".stripMargin)

checkAnswer(
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
}
}

test(s"Paimon migrate table procedure: migrate table with wrong options") {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
|USING parquet
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1)")

assertThatThrownBy(
() =>
spark.sql(
s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl',
|options => 'file.format=parquet,bucket=1')
|""".stripMargin))
.hasMessageContaining("Hive migrator only support unaware-bucket target table")
}
}
}

0 comments on commit 3fa7f57

Please sign in to comment.