Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[migrate] Support migrate table with specific separator #4000

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ public static List<Map<String, String>> getPartitions(String... partitionStrings
}

public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
return parseSeparatedKeyValues(keyValues, ",");
}

public static Map<String, String> parseSeparatedKeyValues(String keyValues, String separator) {
Map<String, String> kvs = new HashMap<>();
if (!StringUtils.isBlank(keyValues)) {
for (String kvString : keyValues.split(",")) {
for (String kvString : keyValues.split(separator)) {
parseKeyValueString(kvs, kvString);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,31 @@ public class MigrateTableAction extends ActionBase {
private final String hiveTableFullName;
private final String tableProperties;

private final String separator;

public MigrateTableAction(
String connector,
String warehouse,
String hiveTableFullName,
Map<String, String> catalogConfig,
String tableProperties) {
String tableProperties,
String separator) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveTableFullName = hiveTableFullName;
this.tableProperties = tableProperties;
this.separator = separator;
}

@Override
public void run() throws Exception {
MigrateTableProcedure migrateTableProcedure = new MigrateTableProcedure();
migrateTableProcedure.withCatalog(catalog);
migrateTableProcedure.call(
new DefaultProcedureContext(env), connector, hiveTableFullName, tableProperties);
new DefaultProcedureContext(env),
connector,
hiveTableFullName,
tableProperties,
separator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class MigrateTableActionFactory implements ActionFactory {

private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
private static final String SEPARATOR = "separator";

@Override
public String identifier() {
Expand All @@ -41,10 +42,11 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String sourceHiveTable = params.get(TABLE);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
String separator = params.get(SEPARATOR) == null ? "," : params.get(SEPARATOR);

MigrateTableAction migrateTableAction =
new MigrateTableAction(
connector, warehouse, sourceHiveTable, catalogConfig, tableConf);
connector, warehouse, sourceHiveTable, catalogConfig, tableConf, separator);
return Optional.of(migrateTableAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ public String[] call(
String sourceTablePath,
String properties)
throws Exception {
return call(procedureContext, connector, sourceTablePath, properties, ",");
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String properties,
String separator)
throws Exception {
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Expand All @@ -62,7 +72,7 @@ public String[] call(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
ParameterUtils.parseCommaSeparatedKeyValues(properties))
ParameterUtils.parseSeparatedKeyValues(properties, separator))
.executeMigrate();

LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public void testAvro() throws Exception {
testUpgradeNonPartitionTable("avro");
resetMetastore();
testUpgradePartitionTable("avro");
resetMetastore();
testUpgradePartitionTableWithSeparator("avro", ";");
}

@Test
Expand Down Expand Up @@ -118,6 +120,45 @@ public void testUpgradePartitionTable(String format) throws Exception {
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}

public void testUpgradePartitionTableWithSeparator(String format, String separator)
throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql(
"CREATE TABLE hivetable (id string, name string) PARTITIONED BY (id2 int, id3 int) STORED AS "
+ format);
tEnv.executeSql("INSERT INTO hivetable VALUES ('a', 'bb', 2, 3)").await();
tEnv.executeSql("SHOW CREATE TABLE hivetable");

tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH ('type'='paimon-generic')");
tEnv.useCatalog("PAIMON_GE");
List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect());

tEnv.executeSql(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:"
+ PORT
+ "' , 'warehouse' = '"
+ System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ "')");
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CALL sys.migrate_table('hive', 'default.hivetable', 'file.format="
+ format
+ ";orc.encrypt=pii:id,name', '"
+ separator
+ "')")
.await();
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM hivetable").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);

List<Row> r3 =
ImmutableList.copyOf(tEnv.executeSql("SHOW CREATE TABLE hivetable").collect());
assert (r3.get(0).toString().contains("'orc.encrypt' = 'pii:id,name',"));
}

public void testUpgradeNonPartitionTable(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
Expand Down Expand Up @@ -175,7 +216,8 @@ public void testMigrateAction(String format) throws Exception {
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"default.hivetable",
catalogConf,
"");
"",
",");
migrateTableAction.run();

tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("options", StringType),
ProcedureParameter.optional("delete_origin", BooleanType),
ProcedureParameter.optional("target_table", StringType)
ProcedureParameter.optional("target_table", StringType),
ProcedureParameter.optional("separator", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -83,6 +84,7 @@ public InternalRow[] call(InternalRow args) {
String properties = args.isNullAt(2) ? null : args.getString(2);
boolean deleteNeed = args.isNullAt(3) || args.getBoolean(3);
String targetTable = args.isNullAt(4) ? null : args.getString(4);
String separator = args.isNullAt(5) ? "," : args.getString(5);

Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier tmpTableId =
Expand All @@ -101,7 +103,7 @@ public InternalRow[] call(InternalRow args) {
sourceTableId.getObjectName(),
tmpTableId.getDatabaseName(),
tmpTableId.getObjectName(),
ParameterUtils.parseCommaSeparatedKeyValues(properties));
ParameterUtils.parseSeparatedKeyValues(properties, separator));

migrator.deleteOriginTable(deleteNeed);
migrator.executeMigrate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
})

Seq("parquet", "orc", "avro").foreach(
format => {
test(s"Paimon migrate table procedure: migrate $format table with separator") {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING)
|USING $format
|""".stripMargin)

spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

spark.sql(
s"CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl', options => 'file.format=$format;orc.encrypt=pii:id,name', separator => ';')")

checkAnswer(
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)

assert(
spark
.sql("SHOW CREATE TABLE hive_tbl")
.collect()
.apply(0)
.toString()
.contains("'orc.encrypt' = 'pii:id,name',"))
}
}
})

Seq("parquet", "orc", "avro").foreach(
format => {
test(
Expand Down
Loading