From f7a4c819ac4d23a3ce7df81c361b31344db9327c Mon Sep 17 00:00:00 2001
From: yuzelin <33053040+yuzelin@users.noreply.github.com>
Date: Wed, 13 Mar 2024 11:40:06 +0800
Subject: [PATCH] [flink] Remove MigrateDatabaseProcedure's dependency on Hive
(#2997)
---
.../org/apache/paimon/migrate/Migrator.java | 2 +
paimon-flink/paimon-flink-common/pom.xml | 54 -------------------
.../procedure/MigrateDatabaseProcedure.java | 38 ++++---------
.../flink/utils/TableMigrationUtils.java | 14 +++++
.../paimon/hive/migrate/HiveMigrator.java | 36 +++++++++++++
5 files changed, 63 insertions(+), 81 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
index 7420528e42e6..b09a9ba3802b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
@@ -22,4 +22,6 @@
public interface Migrator {
void executeMigrate() throws Exception;
+
+ void renameTable(boolean ignoreIfNotExists) throws Exception;
}
diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml
index 74c53a85b298..c4bc2a1d1fd9 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,7 +35,6 @@ under the License.
1.18.1
- 2.3.9
@@ -67,59 +66,6 @@ under the License.
provided
-
- org.apache.hive
- hive-metastore
- ${hive.version}
- provided
-
-
- log4j
- log4j
-
-
- org.slf4j
- slf4j-log4j12
-
-
- com.fasterxml.jackson.core
- jackson-annotations
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- org.apache.orc
- orc-core
-
-
- org.pentaho
- *
-
-
- com.google.protobuf
- protobuf-java
-
-
- org.apache.hive
- hive-storage-api
-
-
- org.apache.avro
- avro
-
-
- org.apache.hadoop
- hadoop-common
-
-
-
-
org.apache.hadoop
hadoop-hdfs
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index 4f2a849c96da..166544eb7357 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -18,24 +18,18 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
/** Migrate procedure to migrate all hive tables in database to paimon table. */
public class MigrateDatabaseProcedure extends ProcedureBase {
- private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);
- private static final String PAIMON_SUFFIX = "_paimon_";
-
@Override
public String identifier() {
return "migrate_database";
@@ -57,28 +51,18 @@ public String[] call(
throw new IllegalArgumentException("Only support Hive Catalog");
}
HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
- IMetaStoreClient client = hiveCatalog.getHmsClient();
- List sourceTables = client.getAllTables(sourceDatabasePath);
- for (String sourceTable : sourceTables) {
- String sourceTablePath = sourceDatabasePath + "." + sourceTable;
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+ List migrators =
+ TableMigrationUtils.getImporters(
+ connector,
+ hiveCatalog,
+ sourceDatabasePath,
+ ParameterUtils.parseCommaSeparatedKeyValues(properties));
- TableMigrationUtils.getImporter(
- connector,
- (HiveCatalog) this.catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
-
- LOG.info("rename " + targetTableId + " to " + sourceTableId);
- this.catalog.renameTable(targetTableId, sourceTableId, false);
+ for (Migrator migrator : migrators) {
+ migrator.executeMigrate();
+ migrator.renameTable(false);
}
+
return new String[] {"Success"};
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 32f263ab3d61..32d017be6665 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -22,6 +22,7 @@
import org.apache.paimon.hive.migrate.HiveMigrator;
import org.apache.paimon.migrate.Migrator;
+import java.util.List;
import java.util.Map;
/** Migration util to choose importer according to connector. */
@@ -48,4 +49,17 @@ public static Migrator getImporter(
throw new UnsupportedOperationException("Don't support connector " + connector);
}
}
+
+ public static List getImporters(
+ String connector,
+ HiveCatalog paimonCatalog,
+ String sourceDatabase,
+ Map options) {
+ switch (connector) {
+ case "hive":
+ return HiveMigrator.databaseMigrators(paimonCatalog, sourceDatabase, options);
+ default:
+ throw new UnsupportedOperationException("Don't support connector " + connector);
+ }
+ }
}
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index 642236cfcfd5..fd31ec3ec0d7 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -42,6 +42,9 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
@@ -60,9 +63,13 @@
/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveMigrator.class);
+
private static final Predicate HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
+ private static final String PAIMON_SUFFIX = "_paimon_";
+
private final FileIO fileIO;
private final HiveCatalog hiveCatalog;
private final IMetaStoreClient client;
@@ -89,6 +96,27 @@ public HiveMigrator(
this.options = options;
}
+ public static List databaseMigrators(
+ HiveCatalog hiveCatalog, String sourceDatabase, Map options) {
+ IMetaStoreClient client = hiveCatalog.getHmsClient();
+ try {
+ return client.getAllTables(sourceDatabase).stream()
+ .map(
+ sourceTable ->
+ new HiveMigrator(
+ hiveCatalog,
+ sourceDatabase,
+ sourceTable,
+ sourceDatabase,
+ sourceTable + PAIMON_SUFFIX,
+ options))
+ .collect(Collectors.toList());
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public void executeMigrate() throws Exception {
if (!client.tableExists(sourceDatabase, sourceTable)) {
throw new RuntimeException("Source hive table does not exist");
@@ -178,6 +206,14 @@ public void executeMigrate() throws Exception {
client.dropTable(sourceDatabase, sourceTable, true, true);
}
+ @Override
+ public void renameTable(boolean ignoreIfNotExists) throws Exception {
+ Identifier targetTableId = Identifier.create(targetDatabase, targetTable);
+ Identifier sourceTableId = Identifier.create(sourceDatabase, sourceTable);
+ LOG.info("Last step: rename {} to {}.", targetTableId, sourceTableId);
+ hiveCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
+ }
+
private void checkPrimaryKey() throws Exception {
PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(sourceDatabase, sourceTable);
if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {