diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 7c84c87d792e..2e4409eb6995 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -172,7 +172,8 @@ private static DataFileMeta constructFileMeta( public static BinaryRow writePartitionValue( RowType partitionRowType, Map partitionValues, - List valueSetters) { + List valueSetters, + String partitionDefaultName) { BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount()); BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); @@ -180,10 +181,13 @@ public static BinaryRow writePartitionValue( List fields = partitionRowType.getFields(); for (int i = 0; i < fields.size(); i++) { - Object value = - TypeUtils.castFromString( - partitionValues.get(fields.get(i).name()), fields.get(i).type()); - valueSetters.get(i).setValue(binaryRowWriter, i, value); + String partitionName = partitionValues.get(fields.get(i).name()); + if (partitionName.equals(partitionDefaultName)) { + binaryRowWriter.setNullAt(i); + } else { + Object value = TypeUtils.castFromString(partitionName, fields.get(i).type()); + valueSetters.get(i).setValue(binaryRowWriter, i, value); + } } binaryRowWriter.complete(); return binaryRow; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index a8516c3a4088..01c2704de00b 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -59,6 +59,7 @@ import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType; import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Migrate hive table to paimon table. */ public class HiveMigrator implements Migrator { @@ -77,7 +78,7 @@ public class HiveMigrator implements Migrator { private final String sourceTable; private final String targetDatabase; private final String targetTable; - private final Map options; + private final CoreOptions coreOptions; private Boolean delete = true; public HiveMigrator( @@ -94,7 +95,7 @@ public HiveMigrator( this.sourceTable = sourceTable; this.targetDatabase = targetDatabase; this.targetTable = targetTable; - this.options = options; + this.coreOptions = new CoreOptions(options); } public static List databaseMigrators( @@ -250,8 +251,10 @@ public Schema from( List fields, List partitionFields, Map hiveTableOptions) { - HashMap paimonOptions = new HashMap<>(this.options); - paimonOptions.put(CoreOptions.BUCKET.key(), "-1"); + checkArgument( + coreOptions.bucket() == -1, + "Hive migrator only support unaware-bucket target table, bucket should be -1"); + Map paimonOptions = coreOptions.toMap(); // for compatible with hive comment system if (hiveTableOptions.get("comment") != null) { paimonOptions.put("hive.comment", hiveTableOptions.get("comment")); @@ -302,7 +305,11 @@ private List importPartitionedTableTask( String format = parseFormat(partition.getSd().getSerdeInfo().toString()); String location = partition.getSd().getLocation(); BinaryRow partitionRow = - FileMetaUtils.writePartitionValue(partitionRowType, values, valueSetters); + FileMetaUtils.writePartitionValue( + partitionRowType, + values, + valueSetters, + coreOptions.partitionDefaultName()); Path path = paimonTable.store().pathFactory().bucketPath(partitionRow, 0); migrateTasks.add( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java index 87090dd921be..30b97b33f838 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java @@ -109,7 +109,7 @@ public InternalRow[] call(InternalRow args) { paimonCatalog.renameTable(tmpTableId, sourceTableId, false); } } catch (Exception e) { - throw new RuntimeException("Call migrate_table error", e); + throw new RuntimeException("Call migrate_table error: " + e.getMessage(), e); } return new InternalRow[] {newInternalRow(true)}; diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala index 457b33e51ad6..710d4dbfa8d3 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.spark.sql.Row +import org.assertj.core.api.Assertions.assertThatThrownBy class MigrateTableProcedureTest extends PaimonHiveTestBase { Seq("parquet", "orc", "avro").foreach( @@ -93,4 +94,51 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase { } } }) + + test(s"Paimon migrate table procedure: migrate partitioned table with null partition") { + withTable("hive_tbl") { + // create hive table + spark.sql(s""" + |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT) + |USING parquet + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1), ('2', 'b', null)") + + checkAnswer( + spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"), + Row("1", "a", 1) :: Row("2", "b", null) :: Nil) + + spark.sql( + s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl', + |options => 'file.format=parquet,partition.default-name=__HIVE_DEFAULT_PARTITION__') + |""".stripMargin) + + checkAnswer( + spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"), + Row("1", "a", 1) :: Row("2", "b", null) :: Nil) + } + } + + test(s"Paimon migrate table procedure: migrate table with wrong options") { + withTable("hive_tbl") { + // create hive table + spark.sql(s""" + |CREATE TABLE hive_tbl (id STRING, name STRING, pt INT) + |USING parquet + |PARTITIONED BY (pt) + |""".stripMargin) + + spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1)") + + assertThatThrownBy( + () => + spark.sql( + s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl', + |options => 'file.format=parquet,bucket=1') + |""".stripMargin)) + .hasMessageContaining("Hive migrator only support unaware-bucket target table") + } + } }