Skip to content

Commit

Permalink
[Flink] Add log for migration and optimize process if failed (#2427)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Nov 30, 2023
1 parent 7cc033c commit 10d5128
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Migrate procedure to migrate hive table to paimon table. */
public class MigrateTableProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateTableProcedure.class);

private static final String PAIMON_SUFFIX = "_paimon_";

@Override
Expand Down Expand Up @@ -66,6 +70,7 @@ public String[] call(
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
catalog.renameTable(targetTableId, sourceTableId, false);
return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,61 +98,82 @@ public void executeMigrate() throws Exception {
Map<String, String> properties = new HashMap<>(sourceHiveTable.getParameters());
checkPrimaryKey();

AbstractFileStoreTable paimonTable =
createPaimonTableIfNotExists(
client.getSchema(sourceDatabase, sourceTable),
sourceHiveTable.getPartitionKeys(),
properties);
checkPaimonTable(paimonTable);

List<String> partitionsNames =
client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
checkCompatible(sourceHiveTable, paimonTable);

List<MigrateTask> tasks = new ArrayList<>();
Map<Path, Path> rollBack = new ConcurrentHashMap<>();
if (partitionsNames.isEmpty()) {
tasks.add(importUnPartitionedTableTask(fileIO, sourceHiveTable, paimonTable, rollBack));
} else {
tasks.addAll(
importPartitionedTableTask(
client,
fileIO,
partitionsNames,
sourceHiveTable,
paimonTable,
rollBack));
// create paimon table if not exists
Identifier identifier = Identifier.create(targetDatabase, targetTable);
boolean alreadyExist = hiveCatalog.tableExists(identifier);
if (!alreadyExist) {
Schema schema =
from(
client.getSchema(sourceDatabase, sourceTable),
sourceHiveTable.getPartitionKeys(),
properties);
hiveCatalog.createTable(identifier, schema, false);
}

List<Future<CommitMessage>> futures =
tasks.stream().map(COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList());
List<CommitMessage> commitMessages = new ArrayList<>();
try {
for (Future<CommitMessage> future : futures) {
commitMessages.add(future.get());
AbstractFileStoreTable paimonTable =
(AbstractFileStoreTable) hiveCatalog.getTable(identifier);
checkPaimonTable(paimonTable);

List<String> partitionsNames =
client.listPartitionNames(sourceDatabase, sourceTable, Short.MAX_VALUE);
checkCompatible(sourceHiveTable, paimonTable);

List<MigrateTask> tasks = new ArrayList<>();
Map<Path, Path> rollBack = new ConcurrentHashMap<>();
if (partitionsNames.isEmpty()) {
tasks.add(
importUnPartitionedTableTask(
fileIO, sourceHiveTable, paimonTable, rollBack));
} else {
tasks.addAll(
importPartitionedTableTask(
client,
fileIO,
partitionsNames,
sourceHiveTable,
paimonTable,
rollBack));
}
} catch (Exception e) {
futures.forEach(f -> f.cancel(true));
for (Future<?> future : futures) {
// wait all task cancelled or finished
while (!future.isDone()) {
//noinspection BusyWait
Thread.sleep(100);

List<Future<CommitMessage>> futures =
tasks.stream()
.map(COMMON_IO_FORK_JOIN_POOL::submit)
.collect(Collectors.toList());
List<CommitMessage> commitMessages = new ArrayList<>();
try {
for (Future<CommitMessage> future : futures) {
commitMessages.add(future.get());
}
}
// roll back all renamed path
for (Map.Entry<Path, Path> entry : rollBack.entrySet()) {
Path newPath = entry.getKey();
Path origin = entry.getValue();
if (fileIO.exists(newPath)) {
fileIO.rename(newPath, origin);
} catch (Exception e) {
futures.forEach(f -> f.cancel(true));
for (Future<?> future : futures) {
// wait all task cancelled or finished
while (!future.isDone()) {
//noinspection BusyWait
Thread.sleep(100);
}
}
// roll back all renamed path
for (Map.Entry<Path, Path> entry : rollBack.entrySet()) {
Path newPath = entry.getKey();
Path origin = entry.getValue();
if (fileIO.exists(newPath)) {
fileIO.rename(newPath, origin);
}
}
}

throw new RuntimeException("Migrating failed because exception happens", e);
throw new RuntimeException("Migrating failed because exception happens", e);
}
paimonTable.newBatchWriteBuilder().newCommit().commit(new ArrayList<>(commitMessages));
} catch (Exception e) {
if (!alreadyExist) {
hiveCatalog.dropTable(identifier, true);
}
throw new RuntimeException("Migrating failed", e);
}

paimonTable.newBatchWriteBuilder().newCommit().commit(new ArrayList<>(commitMessages));
// if all success, drop the origin table
client.dropTable(sourceDatabase, sourceTable, true, true);
}

Expand All @@ -175,26 +196,16 @@ private void checkPaimonTable(AbstractFileStoreTable paimonTable) {
}
}

private AbstractFileStoreTable createPaimonTableIfNotExists(
List<FieldSchema> fields,
List<FieldSchema> partitionFields,
Map<String, String> hiveTableOptions)
throws Exception {

Identifier identifier = Identifier.create(targetDatabase, targetTable);
if (!hiveCatalog.tableExists(identifier)) {
Schema schema = from(fields, partitionFields, hiveTableOptions);
hiveCatalog.createTable(identifier, schema, false);
}
return (AbstractFileStoreTable) hiveCatalog.getTable(identifier);
}

public Schema from(
List<FieldSchema> fields,
List<FieldSchema> partitionFields,
Map<String, String> hiveTableOptions) {
HashMap<String, String> paimonOptions = new HashMap<>(this.options);
paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
// for compatible with hive comment system
if (hiveTableOptions.get("comment") != null) {
paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
}

Schema.Builder schemaBuilder =
Schema.newBuilder()
Expand Down Expand Up @@ -334,6 +345,9 @@ public MigrateTask(

@Override
public CommitMessage call() throws Exception {
if (!fileIO.exists(newDir)) {
fileIO.mkdirs(newDir);
}
List<DataFileMeta> fileMetas =
FileMetaUtils.construct(
fileIO,
Expand Down

0 comments on commit 10d5128

Please sign in to comment.