diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 085e5866f66c..2fd1bb620989 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -139,9 +139,10 @@ This section introduce all available spark procedures about paimon.
options: the table options of the paimon table to migrate.
target_table: name of the target paimon table to migrate. If not set would keep the same name with origin table
delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
- options_map: Options map for adding key-value options which is a map.
+ options_map: Options map for adding key-value options which is a map.
+ parallelism: the parallelism for migrate process, default is core numbers of machine.
- CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1')) |
+ CALL sys.migrate_table(source_type => 'hive', table => 'default.T', options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 6) |
migrate_file |
@@ -151,8 +152,9 @@ This section introduce all available spark procedures about paimon.
source_table: name of the origin table to migrate. Cannot be empty.
target_table: name of the target table to be migrated. Cannot be empty.
delete_origin: If had set target_table, can set delete_origin to decide whether delete the origin table metadata from hms after migrate. Default is true
+ parallelism: the parallelism for migrate process, default is core numbers of machine.
- CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true) |
+ CALL sys.migrate_file(source_type => 'hive', table => 'default.T', delete_origin => true, parallelism => 6) |
remove_orphan_files |
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index 128875a8b862..b398b3258dbf 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -51,6 +51,31 @@ public String[] call(
connector,
catalog,
sourceDatabasePath,
+ Runtime.getRuntime().availableProcessors(),
+ ParameterUtils.parseCommaSeparatedKeyValues(properties));
+
+ for (Migrator migrator : migrators) {
+ migrator.executeMigrate();
+ migrator.renameTable(false);
+ }
+
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceDatabasePath,
+ String properties,
+ Integer parallelism)
+ throws Exception {
+ Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
+ List migrators =
+ TableMigrationUtils.getImporters(
+ connector,
+ catalog,
+ sourceDatabasePath,
+ p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));
for (Migrator migrator : migrators) {
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 110b4e25fc00..5d68cc0f5722 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -40,7 +40,12 @@ public String[] call(
String sourceTablePath,
String targetPaimonTablePath)
throws Exception {
- call(procedureContext, connector, sourceTablePath, targetPaimonTablePath, true);
+ migrateHandle(
+ connector,
+ sourceTablePath,
+ targetPaimonTablePath,
+ true,
+ Runtime.getRuntime().availableProcessors());
return new String[] {"Success"};
}
@@ -51,7 +56,25 @@ public String[] call(
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
- migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
+ migrateHandle(
+ connector,
+ sourceTablePath,
+ targetPaimonTablePath,
+ deleteOrigin,
+ Runtime.getRuntime().availableProcessors());
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String targetPaimonTablePath,
+ boolean deleteOrigin,
+ Integer parallelism)
+ throws Exception {
+ Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
+ migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin, p);
return new String[] {"Success"};
}
@@ -59,7 +82,8 @@ public void migrateHandle(
String connector,
String sourceTablePath,
String targetPaimonTablePath,
- boolean deleteOrigin)
+ boolean deleteOrigin,
+ Integer parallelism)
throws Exception {
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
@@ -77,6 +101,7 @@ public void migrateHandle(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
+ parallelism,
Collections.emptyMap());
importer.deleteOriginTable(deleteOrigin);
importer.executeMigrate();
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 39e6092d8496..196528d31c78 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -62,6 +62,35 @@ public String[] call(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
+ Runtime.getRuntime().availableProcessors(),
+ ParameterUtils.parseCommaSeparatedKeyValues(properties))
+ .executeMigrate();
+
+ LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
+ catalog.renameTable(targetTableId, sourceTableId, false);
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String properties,
+ Integer parallelism)
+ throws Exception {
+ String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+
+ Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+
+ TableMigrationUtils.getImporter(
+ connector,
+ catalog,
+ sourceTableId.getDatabaseName(),
+ sourceTableId.getObjectName(),
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+ parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
index 40c193f8a89f..6a8afd206091 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseAction.java
@@ -29,17 +29,20 @@ public class MigrateDatabaseAction extends ActionBase {
private final String connector;
private final String hiveDatabaseName;
private final String tableProperties;
+ private final Integer parallelism;
public MigrateDatabaseAction(
String connector,
String warehouse,
String hiveDatabaseName,
Map catalogConfig,
- String tableProperties) {
+ String tableProperties,
+ Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveDatabaseName = hiveDatabaseName;
this.tableProperties = tableProperties;
+ this.parallelism = parallelism;
}
@Override
@@ -47,6 +50,10 @@ public void run() throws Exception {
MigrateDatabaseProcedure migrateDatabaseProcedure = new MigrateDatabaseProcedure();
migrateDatabaseProcedure.withCatalog(catalog);
migrateDatabaseProcedure.call(
- new DefaultProcedureContext(env), connector, hiveDatabaseName, tableProperties);
+ new DefaultProcedureContext(env),
+ connector,
+ hiveDatabaseName,
+ tableProperties,
+ parallelism);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
index 25bb50e79b8c..8ce33f58e644 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateDatabaseActionFactory.java
@@ -28,6 +28,7 @@ public class MigrateDatabaseActionFactory implements ActionFactory {
private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
+ private static final String PARALLELISM = "parallelism";
@Override
public String identifier() {
@@ -41,10 +42,16 @@ public Optional create(MultipleParameterToolAdapter params) {
String sourceHiveDatabase = params.get(DATABASE);
Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
+ Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
MigrateDatabaseAction migrateDatabaseAction =
new MigrateDatabaseAction(
- connector, warehouse, sourceHiveDatabase, catalogConfig, tableConf);
+ connector,
+ warehouse,
+ sourceHiveDatabase,
+ catalogConfig,
+ tableConf,
+ parallelism);
return Optional.of(migrateDatabaseAction);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
index 0f24046853de..798d1d347732 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
@@ -32,6 +32,7 @@ public class MigrateFileAction extends ActionBase {
private final String targetTable;
private final String tableProperties;
private boolean deleteOrigin;
+ private Integer parallelism;
public MigrateFileAction(
String connector,
@@ -40,13 +41,15 @@ public MigrateFileAction(
String targetTable,
boolean deleteOrigin,
Map catalogConfig,
- String tableProperties) {
+ String tableProperties,
+ Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.sourceTable = sourceTable;
this.targetTable = targetTable;
this.deleteOrigin = deleteOrigin;
this.tableProperties = tableProperties;
+ this.parallelism = parallelism;
}
@Override
@@ -58,6 +61,7 @@ public void run() throws Exception {
connector,
sourceTable,
targetTable,
- deleteOrigin);
+ deleteOrigin,
+ parallelism);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
index 3c15b03cf6f8..34df99cfdf5b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
@@ -35,6 +35,7 @@ public class MigrateFileActionFactory implements ActionFactory {
private static final String DELETE_ORIGIN = "delete_origin";
private static final String OPTIONS = "options";
+ private static final String PARALLELISM = "parallelism";
@Override
public String identifier() {
@@ -50,6 +51,7 @@ public Optional create(MultipleParameterToolAdapter params) {
boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
+ Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
MigrateFileAction migrateFileAction =
new MigrateFileAction(
@@ -59,7 +61,8 @@ public Optional create(MultipleParameterToolAdapter params) {
targetTable,
deleteOrigin,
catalogConfig,
- tableConf);
+ tableConf,
+ parallelism);
return Optional.of(migrateFileAction);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
index ae7ff0216a77..8a4efdfc710d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -30,17 +30,20 @@ public class MigrateTableAction extends ActionBase {
private final String connector;
private final String hiveTableFullName;
private final String tableProperties;
+ private final Integer parallelism;
public MigrateTableAction(
String connector,
String warehouse,
String hiveTableFullName,
Map catalogConfig,
- String tableProperties) {
+ String tableProperties,
+ Integer parallelism) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveTableFullName = hiveTableFullName;
this.tableProperties = tableProperties;
+ this.parallelism = parallelism;
}
@Override
@@ -48,6 +51,10 @@ public void run() throws Exception {
MigrateTableProcedure migrateTableProcedure = new MigrateTableProcedure();
migrateTableProcedure.withCatalog(catalog);
migrateTableProcedure.call(
- new DefaultProcedureContext(env), connector, hiveTableFullName, tableProperties);
+ new DefaultProcedureContext(env),
+ connector,
+ hiveTableFullName,
+ tableProperties,
+ parallelism);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
index ffe2a9b99cc6..a1a93bc91163 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -28,6 +28,7 @@ public class MigrateTableActionFactory implements ActionFactory {
private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
+ private static final String PARALLELISM = "parallelism";
@Override
public String identifier() {
@@ -41,10 +42,16 @@ public Optional create(MultipleParameterToolAdapter params) {
String sourceHiveTable = params.get(TABLE);
Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
+ Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
MigrateTableAction migrateTableAction =
new MigrateTableAction(
- connector, warehouse, sourceHiveTable, catalogConfig, tableConf);
+ connector,
+ warehouse,
+ sourceHiveTable,
+ catalogConfig,
+ tableConf,
+ parallelism);
return Optional.of(migrateTableAction);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index ad4f704705a7..936d9069e3b2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -41,20 +41,27 @@ public String identifier() {
argument = {
@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "source_database", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
+ @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(
+ name = "parallelism",
+ type = @DataTypeHint("Integer"),
+ isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceDatabasePath,
- String properties)
+ String properties,
+ Integer parallelism)
throws Exception {
properties = notnull(properties);
+ Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
List migrators =
TableMigrationUtils.getImporters(
connector,
catalog,
sourceDatabasePath,
+ p,
ParameterUtils.parseCommaSeparatedKeyValues(properties));
for (Migrator migrator : migrators) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 2538be288109..34b016fe0d36 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -45,6 +45,10 @@ public String identifier() {
@ArgumentHint(
name = "delete_origin",
type = @DataTypeHint("BOOLEAN"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "parallelism",
+ type = @DataTypeHint("Integer"),
isOptional = true)
})
public String[] call(
@@ -52,12 +56,14 @@ public String[] call(
String connector,
String sourceTablePath,
String targetPaimonTablePath,
- Boolean deleteOrigin)
+ Boolean deleteOrigin,
+ Integer parallelism)
throws Exception {
if (deleteOrigin == null) {
deleteOrigin = true;
}
- migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin);
+ Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
+ migrateHandle(connector, sourceTablePath, targetPaimonTablePath, deleteOrigin, p);
return new String[] {"Success"};
}
@@ -65,7 +71,8 @@ public void migrateHandle(
String connector,
String sourceTablePath,
String targetPaimonTablePath,
- boolean deleteOrigin)
+ boolean deleteOrigin,
+ Integer parallelism)
throws Exception {
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
@@ -83,6 +90,7 @@ public void migrateHandle(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
+ parallelism,
Collections.emptyMap());
importer.deleteOriginTable(deleteOrigin);
importer.executeMigrate();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 3dfc948c6bec..fff05a1a8555 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -45,13 +45,18 @@ public String identifier() {
argument = {
@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true)
+ @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(
+ name = "parallelism",
+ type = @DataTypeHint("Integer"),
+ isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
- String properties)
+ String properties,
+ Integer parallelism)
throws Exception {
properties = notnull(properties);
@@ -60,6 +65,8 @@ public String[] call(
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+ Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
+
TableMigrationUtils.getImporter(
connector,
catalog,
@@ -67,6 +74,7 @@ public String[] call(
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
+ p,
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 47655d39a0f7..f493dd244dd4 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -37,6 +37,7 @@ public static Migrator getImporter(
String souceTableName,
String targetDatabase,
String targetTableName,
+ Integer parallelism,
Map options) {
switch (connector) {
case "hive":
@@ -52,6 +53,7 @@ public static Migrator getImporter(
souceTableName,
targetDatabase,
targetTableName,
+ parallelism,
options);
default:
throw new UnsupportedOperationException("Don't support connector " + connector);
@@ -59,7 +61,11 @@ public static Migrator getImporter(
}
public static List getImporters(
- String connector, Catalog catalog, String sourceDatabase, Map options) {
+ String connector,
+ Catalog catalog,
+ String sourceDatabase,
+ Integer parallelism,
+ Map options) {
switch (connector) {
case "hive":
if (catalog instanceof CachingCatalog) {
@@ -69,7 +75,7 @@ public static List getImporters(
throw new IllegalArgumentException("Only support Hive Catalog");
}
return HiveMigrator.databaseMigrators(
- (HiveCatalog) catalog, sourceDatabase, options);
+ (HiveCatalog) catalog, sourceDatabase, options, parallelism);
default:
throw new UnsupportedOperationException("Don't support connector " + connector);
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 379c61f703c4..b9928ce7311b 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -66,9 +66,7 @@
public class HiveMigrator implements Migrator {
private static final Logger LOG = LoggerFactory.getLogger(HiveMigrator.class);
-
- private static final ThreadPoolExecutor EXECUTOR =
- createCachedThreadPool(Runtime.getRuntime().availableProcessors(), "HIVE_MIGRATOR");
+ private ThreadPoolExecutor executor;
private static final Predicate HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
@@ -84,6 +82,7 @@ public class HiveMigrator implements Migrator {
private final String targetTable;
private final CoreOptions coreOptions;
private Boolean delete = true;
+ private Integer parallelism;
public HiveMigrator(
HiveCatalog hiveCatalog,
@@ -91,6 +90,7 @@ public HiveMigrator(
String sourceTable,
String targetDatabase,
String targetTable,
+ Integer parallelism,
Map options) {
this.hiveCatalog = hiveCatalog;
this.fileIO = hiveCatalog.fileIO();
@@ -99,11 +99,16 @@ public HiveMigrator(
this.sourceTable = sourceTable;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
+ this.parallelism = parallelism;
this.coreOptions = new CoreOptions(options);
+ this.executor = createCachedThreadPool(parallelism, "HIVE_MIGRATOR");
}
public static List databaseMigrators(
- HiveCatalog hiveCatalog, String sourceDatabase, Map options) {
+ HiveCatalog hiveCatalog,
+ String sourceDatabase,
+ Map options,
+ Integer parallelism) {
IMetaStoreClient client = hiveCatalog.getHmsClient();
try {
return client.getAllTables(sourceDatabase).stream()
@@ -115,6 +120,7 @@ public static List databaseMigrators(
sourceTable,
sourceDatabase,
sourceTable + PAIMON_SUFFIX,
+ parallelism,
options))
.collect(Collectors.toList());
} catch (TException e) {
@@ -175,7 +181,7 @@ public void executeMigrate() throws Exception {
}
List> futures =
- tasks.stream().map(EXECUTOR::submit).collect(Collectors.toList());
+ tasks.stream().map(executor::submit).collect(Collectors.toList());
List commitMessages = new ArrayList<>();
try {
for (Future future : futures) {
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
index 7829a9981ede..0acb7b8398ed 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateDatabaseProcedureITCase.java
@@ -130,7 +130,7 @@ public void testUpgradePartitionTable(String format, boolean isNamedArgument) th
tEnv.executeSql(
"CALL sys.migrate_database(connector => 'hive', source_database => 'my_database', options => 'file.format="
+ format
- + "')")
+ + "', parallelism => 6)")
.await();
} else {
tEnv.executeSql(
@@ -257,7 +257,8 @@ public void testMigrateDatabaseAction(String format) throws Exception {
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"my_database",
catalogConf,
- "");
+ "",
+ 6);
migrateDatabaseAction.run();
tEnv.executeSql(
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 3f9aebb044aa..a50bdca3d58e 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -169,7 +169,8 @@ public void testMigrateFileAction(String format, boolean isNamedArgument) throws
"default.paimontable02",
false,
catalogConf,
- "");
+ "",
+ 6);
migrateFileAction.run();
tEnv.useCatalog("HIVE");
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index b88e2b95b998..ca3b6c82e7d3 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -191,7 +191,8 @@ public void testMigrateAction(String format) throws Exception {
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
"default.hivetable",
catalogConf,
- "");
+ "",
+ 6);
migrateTableAction.run();
tEnv.executeSql(
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
index 8f4850fef48c..32f89d47b1a0 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
@@ -33,6 +33,7 @@
import java.util.Collections;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
@@ -49,7 +50,8 @@ public class MigrateFileProcedure extends BaseProcedure {
ProcedureParameter.required("source_type", StringType),
ProcedureParameter.required("source_table", StringType),
ProcedureParameter.required("target_table", StringType),
- ProcedureParameter.optional("delete_origin", BooleanType)
+ ProcedureParameter.optional("delete_origin", BooleanType),
+ ProcedureParameter.optional("parallelism", IntegerType)
};
private static final StructType OUTPUT_TYPE =
@@ -78,6 +80,8 @@ public InternalRow[] call(InternalRow args) {
String sourceTable = args.getString(1);
String targetTable = args.getString(2);
boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
+ int parallelism =
+ args.isNullAt(4) ? Runtime.getRuntime().availableProcessors() : args.getInt(4);
Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier targetTableId = Identifier.fromString(targetTable);
@@ -98,6 +102,7 @@ public InternalRow[] call(InternalRow args) {
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
+ parallelism,
Collections.emptyMap());
migrator.deleteOriginTable(deleteNeed);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
index a34bc9dacbdf..b74543df1742 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateTableProcedure.java
@@ -38,6 +38,7 @@
import java.util.Map;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
@@ -59,7 +60,8 @@ public class MigrateTableProcedure extends BaseProcedure {
ProcedureParameter.optional("delete_origin", BooleanType),
ProcedureParameter.optional("target_table", StringType),
ProcedureParameter.optional(
- "options_map", DataTypes.createMapType(StringType, StringType))
+ "options_map", DataTypes.createMapType(StringType, StringType)),
+ ProcedureParameter.optional("parallelism", IntegerType),
};
private static final StructType OUTPUT_TYPE =
@@ -91,6 +93,8 @@ public InternalRow[] call(InternalRow args) {
String targetTable = args.isNullAt(4) ? null : args.getString(4);
MapData mapData = args.isNullAt(5) ? null : args.getMap(5);
Map optionMap = mapDataToHashMap(mapData);
+ int parallelism =
+ args.isNullAt(6) ? Runtime.getRuntime().availableProcessors() : args.getInt(6);
Identifier sourceTableId = Identifier.fromString(sourceTable);
Identifier tmpTableId =
@@ -112,6 +116,7 @@ public InternalRow[] call(InternalRow args) {
sourceTableId.getObjectName(),
tmpTableId.getDatabaseName(),
tmpTableId.getObjectName(),
+ parallelism,
options);
migrator.deleteOriginTable(deleteNeed);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
index 992f17253c9f..73702bf35c2b 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
@@ -36,6 +36,7 @@ public static Migrator getImporter(
String sourceTableName,
String targetDatabase,
String targetTableName,
+ Integer parallelism,
Map options) {
switch (connector) {
case "hive":
@@ -51,6 +52,7 @@ public static Migrator getImporter(
sourceTableName,
targetDatabase,
targetTableName,
+ parallelism,
options);
default:
throw new UnsupportedOperationException("Unsupported connector " + connector);
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
index ceb0879cb9cf..043223d05a8f 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
@@ -57,6 +57,41 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
}
})
+ Seq("parquet", "orc", "avro").foreach(
+ format => {
+ test(
+ s"Paimon migrate file procedure: migrate $format non-partitioned table with parallelism") {
+ withTable("hive_tbl_02", "paimon_tbl_02") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl_02 (id STRING, name STRING, pt STRING)
+ |USING $format
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl_02 VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")
+
+ // create paimon table
+ spark.sql(s"""
+ |CREATE TABLE paimon_tbl_02 (id STRING, name STRING, pt STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('file.format'='$format', 'bucket'='-1')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO paimon_tbl_02 VALUES ('3', 'c', 'p1'), ('4', 'd', 'p2')")
+
+ spark.sql(
+ s"CALL sys.migrate_file(source_type => 'hive', source_table => '$hiveDbName.hive_tbl_02', target_table => '$hiveDbName.paimon_tbl_02', parallelism => 6)")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM paimon_tbl_02 ORDER BY id"),
+ Row("1", "a", "p1") :: Row("2", "b", "p2") :: Row("3", "c", "p1") :: Row(
+ "4",
+ "d",
+ "p2") :: Nil)
+ }
+ }
+ })
+
Seq("parquet", "orc", "avro").foreach(
format => {
test(
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
index 7a97c334f68a..9bffbce09f95 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MigrateTableProcedureTest.scala
@@ -46,6 +46,29 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
}
})
+ Seq("parquet", "orc", "avro").foreach(
+ format => {
+ test(
+ s"Paimon migrate table procedure: migrate $format non-partitioned table with setting parallelism") {
+ withTable("hive_tbl_01") {
+ // create hive table
+ spark.sql(s"""
+ |CREATE TABLE hive_tbl_01 (id STRING, name STRING, pt STRING)
+ |USING $format
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO hive_tbl_01 VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")
+
+ spark.sql(
+ s"CALL sys.migrate_table(source_type => 'hive', table => '$hiveDbName.hive_tbl_01', options => 'file.format=$format', parallelism => 6)")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM hive_tbl_01 ORDER BY id"),
+ Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
+ }
+ }
+ })
+
Seq("parquet", "orc", "avro").foreach(
format => {
test(s"Paimon migrate table procedure: migrate $format table with options_map") {