From 10d51280b406bed8a330171ef5bc174e1204e7dc Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Thu, 30 Nov 2023 21:31:43 +0800 Subject: [PATCH] [Flink] Add log for migration and optimize process if failed (#2427) --- .../procedure/MigrateTableProcedure.java | 5 + .../paimon/hive/migrate/HiveMigrator.java | 134 ++++++++++-------- 2 files changed, 79 insertions(+), 60 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 31b86dbb6c15..f721f89257e5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -24,10 +24,14 @@ import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Migrate procedure to migrate hive table to paimon table. */ public class MigrateTableProcedure extends ProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(MigrateTableProcedure.class); + private static final String PAIMON_SUFFIX = "_paimon_"; @Override @@ -66,6 +70,7 @@ public String[] call( ParameterUtils.parseCommaSeparatedKeyValues(properties)) .executeMigrate(); + LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); catalog.renameTable(targetTableId, sourceTableId, false); return new String[] {"Success"}; } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java index e45d392284ac..28fc5749d019 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java @@ -98,61 +98,82 @@ public void executeMigrate() throws Exception { Map properties = new HashMap<>(sourceHiveTable.getParameters()); checkPrimaryKey(); - AbstractFileStoreTable paimonTable = - createPaimonTableIfNotExists( - client.getSchema(sourceDatabase, sourceTable), - sourceHiveTable.getPartitionKeys(), - properties); - checkPaimonTable(paimonTable); - - List partitionsNames = - client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE); - checkCompatible(sourceHiveTable, paimonTable); - - List tasks = new ArrayList<>(); - Map rollBack = new ConcurrentHashMap<>(); - if (partitionsNames.isEmpty()) { - tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable, rollBack)); - } else { - tasks.addAll( - importPartitionedTableTask( - client, - fileIO, - partitionsNames, - sourceHiveTable, - paimonTable, - rollBack)); + // create paimon table if not exists + Identifier identifier = Identifier.create(targetDatabase, targetTable); + boolean alreadyExist = hiveCatalog.tableExists(identifier); + if (!alreadyExist) { + Schema schema = + from( + client.getSchema(sourceDatabase, sourceTable), + sourceHiveTable.getPartitionKeys(), + properties); + hiveCatalog.createTable(identifier, schema, false); } - List> futures = - tasks.stream().map(COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList()); - List commitMessages = new ArrayList<>(); try { - for (Future future : futures) { - commitMessages.add(future.get()); + AbstractFileStoreTable paimonTable = + (AbstractFileStoreTable) hiveCatalog.getTable(identifier); + checkPaimonTable(paimonTable); + + List partitionsNames = + client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE); + checkCompatible(sourceHiveTable, paimonTable); + + List tasks = new ArrayList<>(); + Map rollBack = new ConcurrentHashMap<>(); + if (partitionsNames.isEmpty()) { + tasks.add( + importUnPartitionedTableTask( + fileIO, sourceHiveTable, paimonTable, rollBack)); + } else { + tasks.addAll( + importPartitionedTableTask( + client, + fileIO, + partitionsNames, + sourceHiveTable, + paimonTable, + rollBack)); } - } catch (Exception e) { - futures.forEach(f -> f.cancel(true)); - for (Future future : futures) { - // wait all task cancelled or finished - while (!future.isDone()) { - //noinspection BusyWait - Thread.sleep(100); + + List> futures = + tasks.stream() + .map(COMMON_IO_FORK_JOIN_POOL::submit) + .collect(Collectors.toList()); + List commitMessages = new ArrayList<>(); + try { + for (Future future : futures) { + commitMessages.add(future.get()); } - } - // roll back all renamed path - for (Map.Entry entry : rollBack.entrySet()) { - Path newPath = entry.getKey(); - Path origin = entry.getValue(); - if (fileIO.exists(newPath)) { - fileIO.rename(newPath, origin); + } catch (Exception e) { + futures.forEach(f -> f.cancel(true)); + for (Future future : futures) { + // wait all task cancelled or finished + while (!future.isDone()) { + //noinspection BusyWait + Thread.sleep(100); + } + } + // roll back all renamed path + for (Map.Entry entry : rollBack.entrySet()) { + Path newPath = entry.getKey(); + Path origin = entry.getValue(); + if (fileIO.exists(newPath)) { + fileIO.rename(newPath, origin); + } } - } - throw new RuntimeException("Migrating failed because exception happens", e); + throw new RuntimeException("Migrating failed because exception happens", e); + } + paimonTable.newBatchWriteBuilder().newCommit().commit(new ArrayList<>(commitMessages)); + } catch (Exception e) { + if (!alreadyExist) { + hiveCatalog.dropTable(identifier, true); + } + throw new RuntimeException("Migrating failed", e); } - paimonTable.newBatchWriteBuilder().newCommit().commit(new ArrayList<>(commitMessages)); + // if all success, drop the origin table client.dropTable(sourceDatabase, sourceTable, true, true); } @@ -175,26 +196,16 @@ private void checkPaimonTable(AbstractFileStoreTable paimonTable) { } } - private AbstractFileStoreTable createPaimonTableIfNotExists( - List fields, - List partitionFields, - Map hiveTableOptions) - throws Exception { - - Identifier identifier = Identifier.create(targetDatabase, targetTable); - if (!hiveCatalog.tableExists(identifier)) { - Schema schema = from(fields, partitionFields, hiveTableOptions); - hiveCatalog.createTable(identifier, schema, false); - } - return (AbstractFileStoreTable) hiveCatalog.getTable(identifier); - } - public Schema from( List fields, List partitionFields, Map hiveTableOptions) { HashMap paimonOptions = new HashMap<>(this.options); paimonOptions.put(CoreOptions.BUCKET.key(), "-1"); + // for compatible with hive comment system + if (hiveTableOptions.get("comment") != null) { + paimonOptions.put("hive.comment", hiveTableOptions.get("comment")); + } Schema.Builder schemaBuilder = Schema.newBuilder() @@ -334,6 +345,9 @@ public MigrateTask( @Override public CommitMessage call() throws Exception { + if (!fileIO.exists(newDir)) { + fileIO.mkdirs(newDir); + } List fileMetas = FileMetaUtils.construct( fileIO,