Skip to content

Commit

Permalink
[recode] readHint.
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jun 4, 2024
1 parent 4e674c2 commit 1845ae4
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 11 deletions.
14 changes: 12 additions & 2 deletions Notes
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@

principles:
- Learn from project and do your things based on this project.
- commit type: note, recode, newcode.
- Very clearly to code, so you can copy or write it.
- Very clearly to code, so you can copy or write it.

commit type:
- note: code comments
- recode: imitate writing code
- newcode: add some new codes

tag:
- askwang-start: method recode main entry
- askwang-todo: need to learn or change.
- askwang-done: todo has been finished.
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
IOException exception = null;
while (retryNumber++ < 5) {
try {
// askwang-todo: path判断没必要放在循环内部。
if (!exists(path)) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// write the hint file in order to see the earliest snapshot directly next time
// should avoid duplicate writes when the file exists
if (snapshotManager.readHint(SnapshotManager.EARLIEST) == null) {
// askwang-todo: 这种条件下应该写入 earliestId 为 EARLIEST 值
// askwang-done: 这种条件下应该写入 earliestId 为 EARLIEST 值
// pr: https://github.com/apache/paimon/pull/3398
writeEarliestHint(endExclusiveId);
}

Expand Down
22 changes: 22 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,28 @@ public static Stream<FileStatus> listVersionedFileStatus(FileIO fileIO, Path dir
.filter(status -> status.getPath().getName().startsWith(prefix));
}

/**
* 列出目录下的 FileStatus
*/
public static Stream<FileStatus> listVersionedFileStatusAskwang(FileIO fileIO, Path dir, String prefix) throws IOException {
if (!fileIO.exists(dir)) {
return Stream.empty();
}

// hadoop的FileStatus[] 转换为 paimon 的 FileStatus[],为什么要这么做?
// org.apache.hadoop.fs.FileStatus[] -> org.apache.paimon.fs.FileStatus[]
FileStatus[] statuses = fileIO.listStatus(dir);
if (statuses == null) {
throw new RuntimeException(
String.format(
"The return value is null of the listStatus for the '%s' directory.",
dir));
}
return Arrays.stream(statuses)
.filter(name -> name.getPath().getName().startsWith(prefix));
}


/**
* List versioned directories for the directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -47,6 +48,7 @@

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatusAskwang;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
Expand Down Expand Up @@ -590,22 +592,23 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
if (!fileIO.exists(dir)) {
return null;
}
Long snapshotId = readHint(EARLIEST);
Long snapshotId = readHintAskwang(EARLIEST);
if (snapshotId != null && fileIO.exists(file.apply(snapshotId))) {
return snapshotId;
}
// (a, b) -> Math.min(a, b) <==> Math:min
return findByListFiles(Math::min, dir, prefix);
return findByListFilesAskwang(Math::min, dir, prefix);
}

/**
* askwang-start
* 查找 LATEST snapshot id
*/
private Long findLatestAskwang(Path dir, String prefix, Function<Long, Path> file) throws IOException {
if (!fileIO.exists(dir)) {
return null;
}
Long snapshotId = readHint(LATEST);
Long snapshotId = readHintAskwang(LATEST);
if (snapshotId != null && snapshotId > 0) {
// check next snapshotId not existed
Long nextSnapshotId = snapshotId + 1;
Expand Down Expand Up @@ -634,11 +637,10 @@ private Long findLatestAskwang(Path dir, String prefix, Function<Long, Path> fil
* - schema id 及对应 schema
*/
private Stream<Long> listVersionedFilesAskwang(FileIO fileIO, Path dir, String prefix) throws IOException {
// askwang-todo: listStatus 需考虑更多的情况,比如目录不存在、目录为空
// askwang-done: listStatus 需考虑更多的情况,比如目录不存在、目录为空
FileStatus[] fileStatuses = fileIO.listStatus(dir);
// "EARLIEST".substring("snapshot-".length()); will throw StringIndexOutOfBoundsException, so filter first
return Arrays.stream(fileStatuses)
.filter(fileStatus -> fileStatus.getPath().getName().startsWith(prefix))
return listVersionedFileStatusAskwang(fileIO, dir, prefix)
.map(FileStatus::getPath)
.map(Path::getName)
.map(name -> name.substring(prefix.length())).map(Long::parseLong);
Expand Down Expand Up @@ -668,6 +670,43 @@ public Long readHint(String fileName, Path dir) {
return null;
}

/**
* 思路:
* 1、读取的文件路径名 path
* 2、InputStream 读取对应 path
* 3、原方法 readHint 有两层重试机制,异常处理也更周到。
*/
public Long readHintAskwang(String fileName) throws IOException {
Path dir = snapshotDirectory();
StringBuilder sBuilder = new StringBuilder();
Path path = new Path(dir, fileName);
if (!fileIO.exists(path)) {
return null;
}
int retryNum = 0;
while (retryNum++ <= READ_HINT_RETRY_NUM) {
try (InputStream in = fileIO.newInputStream(path)){
BufferedReader br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
sBuilder.append(line);
}
return Optional.of(sBuilder.toString()).map(Long::parseLong).orElse(null);
} catch (IOException ignored) {
// read failed do nothing.
}
// no return. represent read path failed. sleep.
try {
TimeUnit.MILLISECONDS.sleep(READ_HINT_RETRY_INTERVAL);
} catch (InterruptedException e) {
// sleep时抛异常需中断线程。
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
return null;
}

private Long findByListFiles(BinaryOperator<Long> reducer, Path dir, String prefix)
throws IOException {
return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null);
Expand All @@ -694,7 +733,11 @@ private void commitHint(long snapshotId, String fileName, Path dir) throws IOExc
fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
}

// snapshot dir; overwrite hdfs file;
/**
* askwang-start
* snapshot dir; overwrite hdfs file;
*
*/
public void commitEarliestHintAskwang(long snapshotId) throws IOException {
Path path = new Path(snapshotDirectory(), EARLIEST);
String content = String.valueOf(snapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ class AskwangScalaITCase extends PaimonSparkTestBase {
getMapPartitionsResult(rdd).foreach(println)

println("-----after partitionBy-----")
// akwang-todo: 使用 CustomPartitioner 报错 Task not serializable
val partitionedRdd = rdd.partitionBy(new HashPartitioner(4))
getMapPartitionsResult(partitionedRdd).foreach(println)
}
Expand Down

0 comments on commit 1845ae4

Please sign in to comment.