From d5290fc7e7fb8f256d1b3ee026812b9ec4762927 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 30 Jul 2024 22:40:49 +0800 Subject: [PATCH] [core] Reduce File IO for snapshot read (#3849) --- .../src/main/java/org/apache/paimon/fs/FileIO.java | 13 +++++++++---- .../src/main/java/org/apache/paimon/Snapshot.java | 10 ++++++---- .../table/source/snapshot/FullStartingScanner.java | 5 ++++- .../org/apache/paimon/utils/SnapshotManager.java | 8 -------- .../paimon/table/TableFormatReadWriteTest.java | 4 ++-- .../table/TableFormatReadWriteWithPkTest.java | 4 ++-- .../apache/paimon/spark/PaimonHiveTestBase.scala | 1 + .../apache/paimon/spark/PaimonSparkTestBase.scala | 1 + ...ureTest.scala => FastForwardProcedureTest.scala} | 0 9 files changed, 25 insertions(+), 21 deletions(-) rename paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/{MergeBranchProcedureTest.scala => FastForwardProcedureTest.scala} (100%) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index cd9022e232bf..fe2e31b8db7b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -297,15 +297,17 @@ default void copyFiles(Path sourceDirectory, Path targetDirectory, boolean overw /** Read file from {@link #overwriteFileUtf8} file. */ default Optional readOverwrittenFileUtf8(Path path) throws IOException { int retryNumber = 0; - IOException exception = null; + Exception exception = null; while (retryNumber++ < 5) { try { + return Optional.of(readFileUtf8(path)); + } catch (FileNotFoundException e) { + return Optional.empty(); + } catch (Exception e) { if (!exists(path)) { return Optional.empty(); } - return Optional.of(readFileUtf8(path)); - } catch (IOException e) { if (e.getClass() .getName() .endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { @@ -322,7 +324,10 @@ default Optional readOverwrittenFileUtf8(Path path) throws IOException { } } - throw exception; + if (exception instanceof IOException) { + throw (IOException) exception; + } + throw new RuntimeException(exception); } // ------------------------------------------------------------------------- diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 6c2656d39447..6102c321db60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -440,8 +440,7 @@ public static Snapshot fromJson(String json) { public static Snapshot fromPath(FileIO fileIO, Path path) { try { - String json = fileIO.readFileUtf8(path); - return Snapshot.fromJson(json); + return fromPathThrowsException(fileIO, path); } catch (IOException e) { throw new RuntimeException("Fails to read snapshot from path " + path, e); } @@ -450,13 +449,16 @@ public static Snapshot fromPath(FileIO fileIO, Path path) { @Nullable public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException { try { - String json = fileIO.readFileUtf8(path); - return Snapshot.fromJson(json); + return fromPathThrowsException(fileIO, path); } catch (FileNotFoundException e) { return null; } } + private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) throws IOException { + return Snapshot.fromJson(fileIO.readFileUtf8(path)); + } + @Override public int hashCode() { return Objects.hash( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java index c177be7eafb7..fc9b49d2d60a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FullStartingScanner.java @@ -42,7 +42,10 @@ public ScanMode startingScanMode() { @Override public Result scan(SnapshotReader snapshotReader) { - Long startingSnapshotId = snapshotManager.latestSnapshotId(); + if (startingSnapshotId == null) { + // try to get first snapshot again + startingSnapshotId = snapshotManager.latestSnapshotId(); + } if (startingSnapshotId == null) { LOG.debug("There is currently no snapshot. Waiting for snapshot generation."); return new NoSnapshot(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index cf0b44b5b4eb..48627957c0e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -611,10 +611,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { private @Nullable Long findLatest(Path dir, String prefix, Function file) throws IOException { - if (!fileIO.exists(dir)) { - return null; - } - Long snapshotId = readHint(LATEST, dir); if (snapshotId != null && snapshotId > 0) { long nextSnapshot = snapshotId + 1; @@ -628,10 +624,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { private @Nullable Long findEarliest(Path dir, String prefix, Function file) throws IOException { - if (!fileIO.exists(dir)) { - return null; - } - Long snapshotId = readHint(EARLIEST, dir); // null and it is the earliest only it exists if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java index 070923249a5a..d303dd862e47 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteTest.java @@ -34,8 +34,8 @@ public class TableFormatReadWriteTest extends TableTestBase { private Table createTable(String format) throws Exception { - catalog.createTable(identifier(), schema(format), true); - return catalog.getTable(identifier()); + catalog.createTable(identifier(format), schema(format), true); + return catalog.getTable(identifier(format)); } private Schema schema(String format) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java index 9fc3e656dfa7..8adfcdc89415 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableFormatReadWriteWithPkTest.java @@ -34,8 +34,8 @@ public class TableFormatReadWriteWithPkTest extends TableTestBase { private Table createTable(String format) throws Exception { - catalog.createTable(identifier(), schema(format), true); - return catalog.getTable(identifier()); + catalog.createTable(identifier(format), schema(format), true); + return catalog.getTable(identifier(format)); } private Schema schema(String format) { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index 842147615d1a..ccd705e26967 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.paimon.Snapshot import org.apache.paimon.hive.TestHiveMetastore import org.apache.hadoop.conf.Configuration diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 983dd037f131..19e711a600a9 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark +import org.apache.paimon.Snapshot import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory, Identifier} import org.apache.paimon.options.{CatalogOptions, Options} import org.apache.paimon.spark.catalog.Catalogs diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala similarity index 100% rename from paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MergeBranchProcedureTest.scala rename to paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/FastForwardProcedureTest.scala