Skip to content

Commit

Permalink
[hive] HiveMigrator support set parallelism for procedures (apache#4177)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Sep 13, 2024
1 parent 647865f commit 983a552
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 35 deletions.
8 changes: 5 additions & 3 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ This section introduce all available spark procedures about paimon.
<li>options: the table options of the paimon table to migrate.</li>
<li>target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table</li>
<li>delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true</li>
<li>options_map: Options map for adding key-value options which is a map.</li>
<li>options_map: Options map for adding key-value options which is a map.</li>
<li>parallelism: the parallelism for migrate process, default is core numbers of machine.</li>
</td>
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'))</td>
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6)</td>
</tr>
<tr>
<td>migrate_file</td>
Expand All @@ -151,8 +152,9 @@ This section introduce all available spark procedures about paimon.
<li>source_table: name of the origin table to migrate. Cannot be empty.</li>
<li>target_table: name of the target table to be migrated. Cannot be empty.</li>
<li>delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true</li>
<li>parallelism: the parallelism for migrate process, default is core numbers of machine.</li>
</td>
<td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true)</td>
<td>CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6)</td>
</tr>
<tr>
<td>remove_orphan_files</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,31 @@ public String[] call(
connector,
catalog,
sourceDatabasePath,
Runtime.getRuntime().availableProcessors(),
ParameterUtils.parseCommaSeparatedKeyValues(properties));

for (Migrator migrator : migrators) {
migrator.executeMigrate();
migrator.renameTable(false);
}

return new String[] {"Success"};
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceDatabasePath,
String properties,
Integer parallelism)
throws Exception {
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
List<Migrator> migrators =
TableMigrationUtils.getImporters(
connector,
catalog,
sourceDatabasePath,
p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));

for (Migrator migrator : migrators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ public String[] call(
String sourceTablePath,
String targetPaimonTablePath)
throws Exception {
call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true);
migrateHandle(
connector,
sourceTablePath,
targetPaimonTablePath,
true,
Runtime.getRuntime().availableProcessors());
return new String[] {"Success"};
}

Expand All @@ -51,15 +56,34 @@ public String[] call(
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
migrateHandle(
connector,
sourceTablePath,
targetPaimonTablePath,
deleteOrigin,
Runtime.getRuntime().availableProcessors());
return new String[] {"Success"};
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String targetPaimonTablePath,
boolean deleteOrigin,
Integer parallelism)
throws Exception {
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin, p);
return new String[] {"Success"};
}

public void migrateHandle(
String connector,
String sourceTablePath,
String targetPaimonTablePath,
boolean deleteOrigin)
boolean deleteOrigin,
Integer parallelism)
throws Exception {
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
Expand All @@ -77,6 +101,7 @@ public void migrateHandle(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
Collections.emptyMap());
importer.deleteOriginTable(deleteOrigin);
importer.executeMigrate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,35 @@ public String[] call(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
Runtime.getRuntime().availableProcessors(),
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
catalog.renameTable(targetTableId, sourceTableId, false);
return new String[] {"Success"};
}

public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String properties,
Integer parallelism)
throws Exception {
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);

TableMigrationUtils.getImporter(
connector,
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,31 @@ public class MigrateDatabaseAction extends ActionBase {
private final String connector;
private final String hiveDatabaseName;
private final String tableProperties;
private final Integer parallelism;

public MigrateDatabaseAction(
String connector,
String warehouse,
String hiveDatabaseName,
Map<String, String> catalogConfig,
String tableProperties) {
String tableProperties,
Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveDatabaseName = hiveDatabaseName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
}

@Override
public void run() throws Exception {
MigrateDatabaseProcedure migrateDatabaseProcedure = new MigrateDatabaseProcedure();
migrateDatabaseProcedure.withCatalog(catalog);
migrateDatabaseProcedure.call(
new DefaultProcedureContext(env), connector, hiveDatabaseName, tableProperties);
new DefaultProcedureContext(env),
connector,
hiveDatabaseName,
tableProperties,
parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class MigrateDatabaseActionFactory implements ActionFactory {

private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

@Override
public String identifier() {
Expand All @@ -41,10 +42,16 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String sourceHiveDatabase = params.get(DATABASE);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));

MigrateDatabaseAction migrateDatabaseAction =
new MigrateDatabaseAction(
connector, warehouse, sourceHiveDatabase, catalogConfig, tableConf);
connector,
warehouse,
sourceHiveDatabase,
catalogConfig,
tableConf,
parallelism);
return Optional.of(migrateDatabaseAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class MigrateFileAction extends ActionBase {
private final String targetTable;
private final String tableProperties;
private boolean deleteOrigin;
private Integer parallelism;

public MigrateFileAction(
String connector,
Expand All @@ -40,13 +41,15 @@ public MigrateFileAction(
String targetTable,
boolean deleteOrigin,
Map<String, String> catalogConfig,
String tableProperties) {
String tableProperties,
Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.sourceTable = sourceTable;
this.targetTable = targetTable;
this.deleteOrigin = deleteOrigin;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
}

@Override
Expand All @@ -58,6 +61,7 @@ public void run() throws Exception {
connector,
sourceTable,
targetTable,
deleteOrigin);
deleteOrigin,
parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class MigrateFileActionFactory implements ActionFactory {
private static final String DELETE_ORIGIN = "delete_origin";

private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

@Override
public String identifier() {
Expand All @@ -50,6 +51,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));

MigrateFileAction migrateFileAction =
new MigrateFileAction(
Expand All @@ -59,7 +61,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
targetTable,
deleteOrigin,
catalogConfig,
tableConf);
tableConf,
parallelism);
return Optional.of(migrateFileAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,31 @@ public class MigrateTableAction extends ActionBase {
private final String connector;
private final String hiveTableFullName;
private final String tableProperties;
private final Integer parallelism;

public MigrateTableAction(
String connector,
String warehouse,
String hiveTableFullName,
Map<String, String> catalogConfig,
String tableProperties) {
String tableProperties,
Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveTableFullName = hiveTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
}

@Override
public void run() throws Exception {
MigrateTableProcedure migrateTableProcedure = new MigrateTableProcedure();
migrateTableProcedure.withCatalog(catalog);
migrateTableProcedure.call(
new DefaultProcedureContext(env), connector, hiveTableFullName, tableProperties);
new DefaultProcedureContext(env),
connector,
hiveTableFullName,
tableProperties,
parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class MigrateTableActionFactory implements ActionFactory {

private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

@Override
public String identifier() {
Expand All @@ -41,10 +42,16 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
String sourceHiveTable = params.get(TABLE);
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));

MigrateTableAction migrateTableAction =
new MigrateTableAction(
connector, warehouse, sourceHiveTable, catalogConfig, tableConf);
connector,
warehouse,
sourceHiveTable,
catalogConfig,
tableConf,
parallelism);
return Optional.of(migrateTableAction);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,27 @@ public String identifier() {
argument = {
@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "source_database", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "parallelism",
type = @DataTypeHint("Integer"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceDatabasePath,
String properties)
String properties,
Integer parallelism)
throws Exception {
properties = notnull(properties);
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
List<Migrator> migrators =
TableMigrationUtils.getImporters(
connector,
catalog,
sourceDatabasePath,
p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));

for (Migrator migrator : migrators) {
Expand Down
Loading

0 comments on commit 983a552

Please sign in to comment.