Skip to content

Commit

Permalink
[flink] Remove MigrateDatabaseProcedure's dependency on Hive
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Mar 13, 2024
1 parent 0087f3a commit 333a18f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@
public interface Migrator {

void executeMigrate() throws Exception;

void renameTable(boolean ignoreIfNotExists) throws Exception;
}
54 changes: 0 additions & 54 deletions paimon-flink/paimon-flink-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ under the License.

<properties>
<flink.version>1.18.1</flink.version>
<hive.version>2.3.9</hive.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -67,59 +66,6 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Migrate procedure to migrate all hive tables in database to paimon table. */
public class MigrateDatabaseProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateDatabaseProcedure.class);
private static final String PAIMON_SUFFIX = "_paimon_";

@Override
public String identifier() {
return "migrate_database";
Expand All @@ -57,28 +51,18 @@ public String[] call(
throw new IllegalArgumentException("Only support Hive Catalog");
}
HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
IMetaStoreClient client = hiveCatalog.getHmsClient();
List<String> sourceTables = client.getAllTables(sourceDatabasePath);
for (String sourceTable : sourceTables) {
String sourceTablePath = sourceDatabasePath + "." + sourceTable;
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
List<Migrator> migrators =
TableMigrationUtils.getImporters(
connector,
hiveCatalog,
sourceDatabasePath,
ParameterUtils.parseCommaSeparatedKeyValues(properties));

TableMigrationUtils.getImporter(
connector,
(HiveCatalog) this.catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

LOG.info("rename " + targetTableId + " to " + sourceTableId);
this.catalog.renameTable(targetTableId, sourceTableId, false);
for (Migrator migrator : migrators) {
migrator.executeMigrate();
migrator.renameTable(false);
}

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.hive.migrate.HiveMigrator;
import org.apache.paimon.migrate.Migrator;

import java.util.List;
import java.util.Map;

/** Migration util to choose importer according to connector. */
Expand All @@ -48,4 +49,17 @@ public static Migrator getImporter(
throw new UnsupportedOperationException("Don't support connector " + connector);
}
}

public static List<Migrator> getImporters(
String connector,
HiveCatalog paimonCatalog,
String sourceDatabase,
Map<String, String> options) {
switch (connector) {
case "hive":
return HiveMigrator.databaseMigrators(paimonCatalog, sourceDatabase, options);
default:
throw new UnsupportedOperationException("Don't support connector " + connector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -60,9 +63,13 @@
/** Migrate hive table to paimon table. */
public class HiveMigrator implements Migrator {

private static final Logger LOG = LoggerFactory.getLogger(HiveMigrator.class);

private static final Predicate<FileStatus> HIDDEN_PATH_FILTER =
p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");

private static final String PAIMON_SUFFIX = "_paimon_";

private final FileIO fileIO;
private final HiveCatalog hiveCatalog;
private final IMetaStoreClient client;
Expand All @@ -89,6 +96,27 @@ public HiveMigrator(
this.options = options;
}

public static List<Migrator> databaseMigrators(
HiveCatalog hiveCatalog, String sourceDatabase, Map<String, String> options) {
IMetaStoreClient client = hiveCatalog.getHmsClient();
try {
return client.getAllTables(sourceDatabase).stream()
.map(
sourceTable ->
new HiveMigrator(
hiveCatalog,
sourceDatabase,
sourceTable,
sourceDatabase,
sourceTable + PAIMON_SUFFIX,
options))
.collect(Collectors.toList());
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public void executeMigrate() throws Exception {
if (!client.tableExists(sourceDatabase, sourceTable)) {
throw new RuntimeException("Source hive table does not exist");
Expand Down Expand Up @@ -178,6 +206,14 @@ public void executeMigrate() throws Exception {
client.dropTable(sourceDatabase, sourceTable, true, true);
}

@Override
public void renameTable(boolean ignoreIfNotExists) throws Exception {
Identifier targetTableId = Identifier.create(targetDatabase, targetTable);
Identifier sourceTableId = Identifier.create(sourceDatabase, sourceTable);
LOG.info("Last step: rename {} to {}.", targetTableId, sourceTableId);
hiveCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
}

private void checkPrimaryKey() throws Exception {
PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(sourceDatabase, sourceTable);
if (!client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {
Expand Down

0 comments on commit 333a18f

Please sign in to comment.