Skip to content

Commit

Permalink
[hive] Refactor codes in HiveMigrator
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 28, 2023
1 parent c7a17b2 commit 15f61c2
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,15 @@
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.Table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
Expand Down Expand Up @@ -127,23 +124,19 @@ public void executeMigrate() throws Exception {
rollBack));
}

Queue<CommitMessage> commitMessages = new LinkedBlockingQueue<>();
List<Future<?>> futures = new ArrayList<>();
tasks.forEach(
task ->
futures.add(
COMMON_IO_FORK_JOIN_POOL.submit(
() -> commitMessages.add(task.get()))));

List<Future<CommitMessage>> futures =
tasks.stream().map(COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList());
List<CommitMessage> commitMessages = new ArrayList<>();
try {
for (Future<?> future : futures) {
future.get();
for (Future<CommitMessage> future : futures) {
commitMessages.add(future.get());
}
} catch (Exception e) {
futures.forEach(f -> f.cancel(true));
for (Future<?> future : futures) {
// wait all task cancelled or finished
while (!future.isDone()) {
//noinspection BusyWait
Thread.sleep(100);
}
}
Expand Down Expand Up @@ -312,7 +305,7 @@ private String parseFormat(String serder) {
}

/** One import task for one partition. */
public static class MigrateTask implements Supplier<CommitMessage> {
public static class MigrateTask implements Callable<CommitMessage> {

private final FileIO fileIO;
private final String format;
Expand Down Expand Up @@ -340,21 +333,17 @@ public MigrateTask(
}

@Override
public CommitMessage get() {
try {
List<DataFileMeta> fileMetas =
FileMetaUtils.construct(
fileIO,
format,
location,
paimonTable,
HIDDEN_PATH_FILTER,
newDir,
rollback);
return FileMetaUtils.commitFile(partitionRow, fileMetas);
} catch (IOException e) {
throw new RuntimeException("Can't get commit message", e);
}
public CommitMessage call() throws Exception {
List<DataFileMeta> fileMetas =
FileMetaUtils.construct(
fileIO,
format,
location,
paimonTable,
HIDDEN_PATH_FILTER,
newDir,
rollback);
return FileMetaUtils.commitFile(partitionRow, fileMetas);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import java.util.Random;

/** Tests for {@link MigrateFileProcedure}. */
public class MigrateFileProcedureTest extends ActionITCaseBase {
public class MigrateFileProcedureITCase extends ActionITCaseBase {

private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.util.Random;

/** Tests for {@link MigrateFileProcedure}. */
public class MigrateTableProcedureTest extends ActionITCaseBase {
public class MigrateTableProcedureITCase extends ActionITCaseBase {

private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore();

Expand Down

0 comments on commit 15f61c2

Please sign in to comment.