diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 819d70223303..a5e31ccd5139 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -42,9 +42,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; -import java.util.UUID; import java.util.stream.Collectors; import static org.apache.paimon.fs.FileIOUtils.checkAccess; @@ -204,7 +204,7 @@ default boolean writeFileUtf8(Path path, String content) throws IOException { return false; } - Path tmp = new Path(path.getParent(), "." + path.getName() + UUID.randomUUID()); + Path tmp = path.createTempPath(); boolean success = false; try { try (PositionOutputStream out = newOutputStream(tmp, false)) { @@ -223,6 +223,43 @@ default boolean writeFileUtf8(Path path, String content) throws IOException { return success; } + /** + * Overwrite file by content atomically, different {@link FileIO}s have different atomic + * implementations. + */ + default void overwriteFileUtf8(Path path, String content) throws IOException { + try (PositionOutputStream out = newOutputStream(path, true)) { + OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8); + writer.write(content); + writer.flush(); + } + } + + /** Read file from {@link #overwriteFileUtf8} file. */ + default Optional readOverwrittenFileUtf8(Path path) throws IOException { + int retryNumber = 0; + IOException exception = null; + while (retryNumber++ < 5) { + try { + if (!exists(path)) { + return Optional.empty(); + } + + return Optional.of(readFileUtf8(path)); + } catch (IOException e) { + if (e.getClass() + .getName() + .endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) { + exception = e; + } else { + throw e; + } + } + } + + throw exception; + } + // ------------------------------------------------------------------------- // static creator // ------------------------------------------------------------------------- diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java index 96b2b9e01313..87a9ce7b03f8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; +import java.util.UUID; import java.util.regex.Pattern; /** @@ -318,6 +319,11 @@ public String getName() { return path.substring(slash + 1); } + /** Create a temporary path (to be used as a copy) for this path. */ + public Path createTempPath() { + return new Path(getParent(), String.format(".%s.%s.tmp", getName(), UUID.randomUUID())); + } + /** * Returns the parent of a path or null if at root. * diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java index dd15b528066e..7074341a7efe 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java @@ -26,12 +26,19 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.hadoop.SerializableConfiguration; +import org.apache.paimon.utils.ReflectionUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; import java.io.IOException; +import java.io.OutputStreamWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicReference; /** Hadoop {@link FileIO}. */ public class HadoopFileIO implements FileIO { @@ -116,6 +123,14 @@ public boolean rename(Path src, Path dst) throws IOException { return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst); } + @Override + public void overwriteFileUtf8(Path path, String content) throws IOException { + boolean success = tryAtomicOverwriteViaRename(path, content); + if (!success) { + FileIO.super.overwriteFileUtf8(path, content); + } + } + private org.apache.hadoop.fs.Path path(Path path) { return new org.apache.hadoop.fs.Path(path.toUri()); } @@ -290,4 +305,85 @@ public long getModificationTime() { return status.getModificationTime(); } } + + // ============================== extra methods =================================== + + private transient volatile AtomicReference renameMethodRef; + + public boolean tryAtomicOverwriteViaRename(Path dst, String content) throws IOException { + org.apache.hadoop.fs.Path hadoopDst = path(dst); + FileSystem fs = getFileSystem(hadoopDst); + + if (renameMethodRef == null) { + synchronized (this) { + if (renameMethodRef == null) { + Method method; + // Implementation in FileSystem is incorrect, not atomic, Object Storage like S3 + // and OSS not override it + // DistributedFileSystem and ViewFileSystem override the rename method to public + // and implement correct renaming + try { + method = ReflectionUtils.getMethod(fs.getClass(), "rename", 3); + } catch (NoSuchMethodException e) { + method = null; + } + renameMethodRef = new AtomicReference<>(method); + } + } + } + + Method renameMethod = renameMethodRef.get(); + if (renameMethod == null) { + return false; + } + + boolean renameDone = false; + + // write tempPath + Path tempPath = dst.createTempPath(); + org.apache.hadoop.fs.Path hadoopTemp = path(tempPath); + try { + try (PositionOutputStream out = newOutputStream(tempPath, false)) { + OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8); + writer.write(content); + writer.flush(); + } + + renameMethod.invoke( + fs, hadoopTemp, hadoopDst, new Options.Rename[] {Options.Rename.OVERWRITE}); + renameDone = true; + // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is + // resolved + tryRemoveCrcFile(hadoopTemp); + return true; + } catch (InvocationTargetException | IllegalAccessException e) { + throw new IOException(e); + } finally { + if (!renameDone) { + deleteQuietly(tempPath); + } + } + } + + /** @throws IOException if a fatal exception occurs. Will try to ignore most exceptions. */ + @SuppressWarnings("CatchMayIgnoreException") + private void tryRemoveCrcFile(org.apache.hadoop.fs.Path path) throws IOException { + try { + final org.apache.hadoop.fs.Path checksumFile = + new org.apache.hadoop.fs.Path( + path.getParent(), String.format(".%s.crc", path.getName())); + + if (fs.exists(checksumFile)) { + // checksum file exists, deleting it + fs.delete(checksumFile, true); // recursive=true + } + } catch (Throwable t) { + if (t instanceof VirtualMachineError + || t instanceof ThreadDeath + || t instanceof LinkageError) { + throw t; + } + // else, ignore - we are removing crc file as "best-effort" + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java index bf14043f3ae9..d523b099d425 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java @@ -47,4 +47,21 @@ public static T invokeStaticMethod(Method method, Object... args) throws InvocationTargetException, IllegalAccessException { return (T) method.invoke(null, args); } + + public static Method getMethod(Class clz, String methodName, int argSize) + throws NoSuchMethodException { + Method method = null; + Method[] methods = clz.getMethods(); + for (Method m : methods) { + if (methodName.equals(m.getName()) && m.getParameterTypes().length == argSize) { + method = m; + break; + } + } + + if (method == null) { + throw new NoSuchMethodException(methodName); + } + return method; + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java index 3b6ae7ac3dfe..106dee38e3fe 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java @@ -227,7 +227,7 @@ void testMkdirsFailsWithExistingParentFile() throws Exception { // Utilities // ------------------------------------------------------------------------ - private static String randomName() { + protected static String randomName() { return StringUtils.getRandomString(RND, 16, 16, 'a', 'z'); } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java index 67064ff7b4dd..c1b86a0b20ea 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java @@ -26,6 +26,8 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -56,4 +58,52 @@ public void testRequireOptions() throws IOException { CatalogContext.create(options)); assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class); } + + public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException { + AtomicReference exception = new AtomicReference<>(); + final int max = 10; + + Thread writeThread = + new Thread( + () -> { + for (int i = 0; i <= max; i++) { + try { + fileIO.overwriteFileUtf8(file, "" + i); + Thread.sleep(100); + } catch (Exception e) { + exception.set(e); + return; + } + } + }); + + Thread readThread = + new Thread( + () -> { + while (true) { + try { + Optional ret = fileIO.readOverwrittenFileUtf8(file); + if (!ret.isPresent()) { + continue; + } + + int value = Integer.parseInt(ret.get()); + if (value == max) { + return; + } + } catch (Exception e) { + exception.set(e); + return; + } + } + }); + + writeThread.start(); + readThread.start(); + + writeThread.join(); + readThread.join(); + + assertThat(exception.get()).isNull(); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java index 1939d98a9abc..df5eb7ba7510 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java @@ -25,10 +25,13 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; +import java.io.IOException; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; /** Behavior tests for HDFS. */ @@ -84,4 +87,19 @@ protected FileIO getFileSystem() { protected Path getBasePath() { return basePath; } + + @Test + public void testAtomicWrite() throws IOException { + Path file = new Path(getBasePath(), randomName()); + fs.tryAtomicOverwriteViaRename(file, "Hi"); + assertThat(fs.readFileUtf8(file)).isEqualTo("Hi"); + + fs.tryAtomicOverwriteViaRename(file, "Hello"); + assertThat(fs.readFileUtf8(file)).isEqualTo("Hello"); + } + + @Test + public void testAtomicWriteMultipleThreads() throws InterruptedException { + FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(), randomName()), fs); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java index 1d0bda823478..0aad5bd63584 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java @@ -26,8 +26,9 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Optional; -import java.util.concurrent.TimeUnit; /** Consumer which contains next snapshot. */ public class Consumer { @@ -58,31 +59,10 @@ public static Consumer fromJson(String json) { } public static Optional fromPath(FileIO fileIO, Path path) { - // Consumer updating uses FileIO.newOutputStream(..., overwrite). - // But this API may have some intermediate state, the file maybe empty - // So retry here to avoid exception when the file is intermediate state - int retryNumber = 0; - Exception exception = null; - while (retryNumber++ < READ_CONSUMER_RETRY_NUM) { - try { - if (!fileIO.exists(path)) { - return Optional.empty(); - } - - String json = fileIO.readFileUtf8(path); - return Optional.of(Consumer.fromJson(json)); - } catch (Exception e) { - exception = e; - } - - try { - TimeUnit.MILLISECONDS.sleep(READ_CONSUMER_RETRY_INTERVAL); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + try { + return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson); + } catch (IOException e) { + throw new UncheckedIOException(e); } - - throw new RuntimeException("Fails to read snapshot from path " + path, exception); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java index b788c4a28672..6a928b81cdb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java @@ -20,14 +20,11 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.utils.DateTimeUtils; import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; @@ -59,10 +56,8 @@ public Optional consumer(String consumerId) { } public void resetConsumer(String consumerId, Consumer consumer) { - try (PositionOutputStream out = fileIO.newOutputStream(consumerPath(consumerId), true)) { - OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8); - writer.write(consumer.toJson()); - writer.flush(); + try { + fileIO.overwriteFileUtf8(consumerPath(consumerId), consumer.toJson()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index 05975ede9ebc..a7414bb780e0 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -178,6 +178,13 @@ under the License. 3.1.0 test + + org.apache.paimon + paimon-common + ${project.version} + test-jar + test + org.apache.paimon paimon-hive-common diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java index 7a27d7d03732..22c9f7175c9a 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java @@ -18,7 +18,11 @@ package org.apache.paimon.spark; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOTest; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.s3.MinioTestContainer; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; @@ -32,6 +36,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -46,12 +51,14 @@ public class SparkS3ITCase { @RegisterExtension public static final MinioTestContainer MINIO_CONTAINER = new MinioTestContainer(); + private static Path warehousePath; + private static SparkSession spark = null; @BeforeAll public static void startMetastoreAndSpark() { String path = MINIO_CONTAINER.getS3UriForDefaultBucket() + "/" + UUID.randomUUID(); - Path warehousePath = new Path(path); + warehousePath = new Path(path); spark = SparkSession.builder().master("local[2]").getOrCreate(); spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName()); spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString()); @@ -83,7 +90,7 @@ public SparkS3ITCase(String format) { @AfterEach public void afterEach() { - spark.sql("DROP TABLE T"); + spark.sql("DROP TABLE IF EXISTS T"); } @TestTemplate @@ -97,4 +104,13 @@ public void testWriteRead() { List rows = spark.sql("SELECT * FROM T").collectAsList(); assertThat(rows.toString()).isEqualTo("[[1,2,3]]"); } + + @TestTemplate + public void testS3AtomicWriteMultipleThreads() throws InterruptedException, IOException { + Path file = new Path(warehousePath, UUID.randomUUID().toString()); + Options options = new Options(); + MINIO_CONTAINER.getS3ConfigOptions().forEach(options::setString); + FileIO fileIO = FileIO.get(file, CatalogContext.create(options)); + FileIOTest.testOverwriteFileUtf8(file, fileIO); + } }