Skip to content

Commit

Permalink
[core] ConsumerManager should use atomic overwrite for HDFS (apache#2166
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 4c0023f)
  • Loading branch information
JingsongLi authored and pongandnoon committed Oct 30, 2023
1 parent fe86a1d commit 4f8faeb
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 38 deletions.
41 changes: 39 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.fs.FileIOUtils.checkAccess;
Expand Down Expand Up @@ -204,7 +204,7 @@ default boolean writeFileUtf8(Path path, String content) throws IOException {
return false;
}

Path tmp = new Path(path.getParent(), "." + path.getName() + UUID.randomUUID());
Path tmp = path.createTempPath();
boolean success = false;
try {
try (PositionOutputStream out = newOutputStream(tmp, false)) {
Expand All @@ -223,6 +223,43 @@ default boolean writeFileUtf8(Path path, String content) throws IOException {
return success;
}

/**
* Overwrite file by content atomically, different {@link FileIO}s have different atomic
* implementations.
*/
default void overwriteFileUtf8(Path path, String content) throws IOException {
try (PositionOutputStream out = newOutputStream(path, true)) {
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
writer.write(content);
writer.flush();
}
}

/** Read file from {@link #overwriteFileUtf8} file. */
default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
int retryNumber = 0;
IOException exception = null;
while (retryNumber++ < 5) {
try {
if (!exists(path)) {
return Optional.empty();
}

return Optional.of(readFileUtf8(path));
} catch (IOException e) {
if (e.getClass()
.getName()
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
exception = e;
} else {
throw e;
}
}
}

throw exception;
}

// -------------------------------------------------------------------------
// static creator
// -------------------------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -318,6 +319,11 @@ public String getName() {
return path.substring(slash + 1);
}

/** Create a temporary path (to be used as a copy) for this path. */
public Path createTempPath() {
return new Path(getParent(), String.format(".%s.%s.tmp", getName(), UUID.randomUUID()));
}

/**
* Returns the parent of a path or null if at root.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.ReflectionUtils;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;

/** Hadoop {@link FileIO}. */
public class HadoopFileIO implements FileIO {
Expand Down Expand Up @@ -116,6 +123,14 @@ public boolean rename(Path src, Path dst) throws IOException {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}

@Override
public void overwriteFileUtf8(Path path, String content) throws IOException {
boolean success = tryAtomicOverwriteViaRename(path, content);
if (!success) {
FileIO.super.overwriteFileUtf8(path, content);
}
}

private org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
Expand Down Expand Up @@ -290,4 +305,85 @@ public long getModificationTime() {
return status.getModificationTime();
}
}

// ============================== extra methods ===================================

private transient volatile AtomicReference<Method> renameMethodRef;

public boolean tryAtomicOverwriteViaRename(Path dst, String content) throws IOException {
org.apache.hadoop.fs.Path hadoopDst = path(dst);
FileSystem fs = getFileSystem(hadoopDst);

if (renameMethodRef == null) {
synchronized (this) {
if (renameMethodRef == null) {
Method method;
// Implementation in FileSystem is incorrect, not atomic, Object Storage like S3
// and OSS not override it
// DistributedFileSystem and ViewFileSystem override the rename method to public
// and implement correct renaming
try {
method = ReflectionUtils.getMethod(fs.getClass(), "rename", 3);
} catch (NoSuchMethodException e) {
method = null;
}
renameMethodRef = new AtomicReference<>(method);
}
}
}

Method renameMethod = renameMethodRef.get();
if (renameMethod == null) {
return false;
}

boolean renameDone = false;

// write tempPath
Path tempPath = dst.createTempPath();
org.apache.hadoop.fs.Path hadoopTemp = path(tempPath);
try {
try (PositionOutputStream out = newOutputStream(tempPath, false)) {
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8);
writer.write(content);
writer.flush();
}

renameMethod.invoke(
fs, hadoopTemp, hadoopDst, new Options.Rename[] {Options.Rename.OVERWRITE});
renameDone = true;
// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is
// resolved
tryRemoveCrcFile(hadoopTemp);
return true;
} catch (InvocationTargetException | IllegalAccessException e) {
throw new IOException(e);
} finally {
if (!renameDone) {
deleteQuietly(tempPath);
}
}
}

/** @throws IOException if a fatal exception occurs. Will try to ignore most exceptions. */
@SuppressWarnings("CatchMayIgnoreException")
private void tryRemoveCrcFile(org.apache.hadoop.fs.Path path) throws IOException {
try {
final org.apache.hadoop.fs.Path checksumFile =
new org.apache.hadoop.fs.Path(
path.getParent(), String.format(".%s.crc", path.getName()));

if (fs.exists(checksumFile)) {
// checksum file exists, deleting it
fs.delete(checksumFile, true); // recursive=true
}
} catch (Throwable t) {
if (t instanceof VirtualMachineError
|| t instanceof ThreadDeath
|| t instanceof LinkageError) {
throw t;
}
// else, ignore - we are removing crc file as "best-effort"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,21 @@ public static <T> T invokeStaticMethod(Method method, Object... args)
throws InvocationTargetException, IllegalAccessException {
return (T) method.invoke(null, args);
}

public static Method getMethod(Class<?> clz, String methodName, int argSize)
throws NoSuchMethodException {
Method method = null;
Method[] methods = clz.getMethods();
for (Method m : methods) {
if (methodName.equals(m.getName()) && m.getParameterTypes().length == argSize) {
method = m;
break;
}
}

if (method == null) {
throw new NoSuchMethodException(methodName);
}
return method;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void testMkdirsFailsWithExistingParentFile() throws Exception {
// Utilities
// ------------------------------------------------------------------------

private static String randomName() {
protected static String randomName() {
return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
}

Expand Down
50 changes: 50 additions & 0 deletions paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -56,4 +58,52 @@ public void testRequireOptions() throws IOException {
CatalogContext.create(options));
assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class);
}

public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
final int max = 10;

Thread writeThread =
new Thread(
() -> {
for (int i = 0; i <= max; i++) {
try {
fileIO.overwriteFileUtf8(file, "" + i);
Thread.sleep(100);
} catch (Exception e) {
exception.set(e);
return;
}
}
});

Thread readThread =
new Thread(
() -> {
while (true) {
try {
Optional<String> ret = fileIO.readOverwrittenFileUtf8(file);
if (!ret.isPresent()) {
continue;
}

int value = Integer.parseInt(ret.get());
if (value == max) {
return;
}
} catch (Exception e) {
exception.set(e);
return;
}
}
});

writeThread.start();
readThread.start();

writeThread.join();
readThread.join();

assertThat(exception.get()).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/** Behavior tests for HDFS. */
Expand Down Expand Up @@ -84,4 +87,19 @@ protected FileIO getFileSystem() {
protected Path getBasePath() {
return basePath;
}

@Test
public void testAtomicWrite() throws IOException {
Path file = new Path(getBasePath(), randomName());
fs.tryAtomicOverwriteViaRename(file, "Hi");
assertThat(fs.readFileUtf8(file)).isEqualTo("Hi");

fs.tryAtomicOverwriteViaRename(file, "Hello");
assertThat(fs.readFileUtf8(file)).isEqualTo("Hello");
}

@Test
public void testAtomicWriteMultipleThreads() throws InterruptedException {
FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(), randomName()), fs);
}
}
32 changes: 6 additions & 26 deletions paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/** Consumer which contains next snapshot. */
public class Consumer {
Expand Down Expand Up @@ -58,31 +59,10 @@ public static Consumer fromJson(String json) {
}

public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
// Consumer updating uses FileIO.newOutputStream(..., overwrite).
// But this API may have some intermediate state, the file maybe empty
// So retry here to avoid exception when the file is intermediate state
int retryNumber = 0;
Exception exception = null;
while (retryNumber++ < READ_CONSUMER_RETRY_NUM) {
try {
if (!fileIO.exists(path)) {
return Optional.empty();
}

String json = fileIO.readFileUtf8(path);
return Optional.of(Consumer.fromJson(json));
} catch (Exception e) {
exception = e;
}

try {
TimeUnit.MILLISECONDS.sleep(READ_CONSUMER_RETRY_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
try {
return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

throw new RuntimeException("Fails to read snapshot from path " + path, exception);
}
}
Loading

0 comments on commit 4f8faeb

Please sign in to comment.