From 333a18f05d9cffe661f93791eea1f2a86d243702 Mon Sep 17 00:00:00 2001 From: yuzelin <747884505@qq.com> Date: Wed, 13 Mar 2024 10:57:18 +0800 Subject: [PATCH] [flink] Remove MigrateDatabaseProcedure's dependency on Hive --- .../org/apache/paimon/migrate/Migrator.java | 2 + paimon-flink/paimon-flink-common/pom.xml | 54 ------------------- .../procedure/MigrateDatabaseProcedure.java | 38 ++++--------- .../flink/utils/TableMigrationUtils.java | 14 +++++ .../paimon/hive/migrate/HiveMigrator.java | 36 +++++++++++++ 5 files changed, 63 insertions(+), 81 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java index 7420528e42e6..b09a9ba3802b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java @@ -22,4 +22,6 @@ public interface Migrator { void executeMigrate() throws Exception; + + void renameTable(boolean ignoreIfNotExists) throws Exception; } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 74c53a85b298..c4bc2a1d1fd9 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -35,7 +35,6 @@ under the License. 1.18.1 - 2.3.9 @@ -67,59 +66,6 @@ under the License. provided - - org.apache.hive - hive-metastore - ${hive.version} - provided - - - log4j - log4j - - - org.slf4j - slf4j-log4j12 - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - org.apache.orc - orc-core - - - org.pentaho - * - - - com.google.protobuf - protobuf-java - - - org.apache.hive - hive-storage-api - - - org.apache.avro - avro - - - org.apache.hadoop - hadoop-common - - - - org.apache.hadoop hadoop-hdfs diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java index 4f2a849c96da..166544eb7357 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java @@ -18,24 +18,18 @@ package org.apache.paimon.flink.procedure; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; /** Migrate procedure to migrate all hive tables in database to paimon table. */ public class MigrateDatabaseProcedure extends ProcedureBase { - private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class); - private static final String PAIMON_SUFFIX = "_paimon_"; - @Override public String identifier() { return "migrate_database"; @@ -57,28 +51,18 @@ public String[] call( throw new IllegalArgumentException("Only support Hive Catalog"); } HiveCatalog hiveCatalog = (HiveCatalog) this.catalog; - IMetaStoreClient client = hiveCatalog.getHmsClient(); - List sourceTables = client.getAllTables(sourceDatabasePath); - for (String sourceTable : sourceTables) { - String sourceTablePath = sourceDatabasePath + "." + sourceTable; - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - - Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + List migrators = + TableMigrationUtils.getImporters( + connector, + hiveCatalog, + sourceDatabasePath, + ParameterUtils.parseCommaSeparatedKeyValues(properties)); - TableMigrationUtils.getImporter( - connector, - (HiveCatalog) this.catalog, - sourceTableId.getDatabaseName(), - sourceTableId.getObjectName(), - targetTableId.getDatabaseName(), - targetTableId.getObjectName(), - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); - - LOG.info("rename " + targetTableId + " to " + sourceTableId); - this.catalog.renameTable(targetTableId, sourceTableId, false); + for (Migrator migrator : migrators) { + migrator.executeMigrate(); + migrator.renameTable(false); } + return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index 32f263ab3d61..32d017be6665 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -22,6 +22,7 @@ import org.apache.paimon.hive.migrate.HiveMigrator; import org.apache.paimon.migrate.Migrator; +import java.util.List; import java.util.Map; /** Migration util to choose importer according to connector. */ @@ -48,4 +49,17 @@ public static Migrator getImporter( throw new UnsupportedOperationException("Don't support connector " + connector); } } + + public static List getImporters( + String connector, + HiveCatalog paimonCatalog, + String sourceDatabase, + Map options) { + switch (connector) { + case "hive": + return HiveMigrator.databaseMigrators(paimonCatalog, sourceDatabase, options); + default: + throw new UnsupportedOperationException("Don't support connector " + connector); + } + } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index 642236cfcfd5..fd31ec3ec0d7 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -42,6 +42,9 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Comparator; @@ -60,9 +63,13 @@ /** Migrate hive table to paimon table. */ public class HiveMigrator implements Migrator { + private static final Logger LOG = LoggerFactory.getLogger(HiveMigrator.class); + private static final Predicate HIDDEN_PATH_FILTER = p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith("."); + private static final String PAIMON_SUFFIX = "_paimon_"; + private final FileIO fileIO; private final HiveCatalog hiveCatalog; private final IMetaStoreClient client; @@ -89,6 +96,27 @@ public HiveMigrator( this.options = options; } + public static List databaseMigrators( + HiveCatalog hiveCatalog, String sourceDatabase, Map options) { + IMetaStoreClient client = hiveCatalog.getHmsClient(); + try { + return client.getAllTables(sourceDatabase).stream() + .map( + sourceTable -> + new HiveMigrator( + hiveCatalog, + sourceDatabase, + sourceTable, + sourceDatabase, + sourceTable + PAIMON_SUFFIX, + options)) + .collect(Collectors.toList()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override public void executeMigrate() throws Exception { if (!client.tableExists(sourceDatabase, sourceTable)) { throw new RuntimeException("Source hive table does not exist"); @@ -178,6 +206,14 @@ public void executeMigrate() throws Exception { client.dropTable(sourceDatabase, sourceTable, true, true); } + @Override + public void renameTable(boolean ignoreIfNotExists) throws Exception { + Identifier targetTableId = Identifier.create(targetDatabase, targetTable); + Identifier sourceTableId = Identifier.create(sourceDatabase, sourceTable); + LOG.info("Last step: rename {} to {}.", targetTableId, sourceTableId); + hiveCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists); + } + private void checkPrimaryKey() throws Exception { PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(sourceDatabase, sourceTable); if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {