Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix migrate hive partitioned table with null partition #3445

Merged
merged 4 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,22 @@ private static DataFileMeta constructFileMeta(
public static BinaryRow writePartitionValue(
RowType partitionRowType,
Map<String, String> partitionValues,
List<BinaryWriter.ValueSetter> valueSetters) {
List<BinaryWriter.ValueSetter> valueSetters,
String partitionDefaultName) {

BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);

List<DataField> fields = partitionRowType.getFields();

for (int i = 0; i < fields.size(); i++) {
Object value =
TypeUtils.castFromString(
partitionValues.get(fields.get(i).name()), fields.get(i).type());
valueSetters.get(i).setValue(binaryRowWriter, i, value);
String partitionName = partitionValues.get(fields.get(i).name());
if (partitionName.equals(partitionDefaultName)) {
binaryRowWriter.setNullAt(i);
} else {
Object value = TypeUtils.castFromString(partitionName, fields.get(i).type());
valueSetters.get(i).setValue(binaryRowWriter, i, value);
}
}
binaryRowWriter.complete();
return binaryRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
import static org.apache.paimon.utils.FileUtils.COMMON_IO_FORK_JOIN_POOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {
Expand All @@ -77,7 +78,7 @@ public class HiveMigrator implements Migrator {
private final String sourceTable;
private final String targetDatabase;
private final String targetTable;
private final Map<String, String> options;
private final CoreOptions coreOptions;
private Boolean delete = true;

public HiveMigrator(
Expand All @@ -94,7 +95,7 @@ public HiveMigrator(
this.sourceTable = sourceTable;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
this.options = options;
this.coreOptions = new CoreOptions(options);
}

public static List<Migrator> databaseMigrators(
Expand Down Expand Up @@ -250,8 +251,10 @@ public Schema from(
List<FieldSchema> fields,
List<FieldSchema> partitionFields,
Map<String, String> hiveTableOptions) {
HashMap<String, String> paimonOptions = new HashMap<>(this.options);
paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
checkArgument(
coreOptions.bucket() == -1,
"Hive migrator only support unaware-bucket target table, bucket should be -1");
Map<String, String> paimonOptions = coreOptions.toMap();
// for compatible with hive comment system
if (hiveTableOptions.get("comment") != null) {
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
Expand Down Expand Up @@ -302,7 +305,11 @@ private List<MigrateTask> importPartitionedTableTask(
String format = parseFormat(partition.getSd().getSerdeInfo().toString());
String location = partition.getSd().getLocation();
BinaryRow partitionRow =
FileMetaUtils.writePartitionValue(partitionRowType, values, valueSetters);
FileMetaUtils.writePartitionValue(
partitionRowType,
values,
valueSetters,
coreOptions.partitionDefaultName());
Path path = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);

migrateTasks.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public InternalRow[] call(InternalRow args) {
paimonCatalog.renameTable(tmpTableId, sourceTableId, false);
}
} catch (Exception e) {
throw new RuntimeException("Call migrate_table error", e);
throw new RuntimeException("Call migrate_table error: " + e.getMessage(), e);
}

return new InternalRow[] {newInternalRow(true)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonHiveTestBase

import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.assertThatThrownBy

class MigrateTableProcedureTest extends PaimonHiveTestBase {
Seq("parquet", "orc", "avro").foreach(
Expand Down Expand Up @@ -93,4 +94,51 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
}
})

test(s"Paimon migrate table procedure: migrate partitioned table with null partition") {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
|USING parquet
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1), ('2', 'b', null)")

checkAnswer(
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", 1) :: Row("2", "b", null) :: Nil)

spark.sql(
s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl',
|options => 'file.format=parquet,partition.default-name=__HIVE_DEFAULT_PARTITION__')
|""".stripMargin)

checkAnswer(
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", 1) :: Row("2", "b", null) :: Nil)
}
}

test(s"Paimon migrate table procedure: migrate table with wrong options") {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt INT)
|USING parquet
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 1)")

assertThatThrownBy(
() =>
spark.sql(
s"""CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl',
|options => 'file.format=parquet,bucket=1')
|""".stripMargin))
.hasMessageContaining("Hive migrator only support unaware-bucket target table")
}
}
}
Loading