Skip to content

Commit

Permalink
compile
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jun 13, 2024
1 parent 7ee8be9 commit 04c3be2
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 141 deletions.
3 changes: 2 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ default Optional<String> readOverwrittenFileUtf8(Path path) throws IOException {
IOException exception = null;
while (retryNumber++ < 5) {
try {
// askwang-todo: path判断没必要放在循环内部。
// askwang-done: path判断没必要放在循环内部。
// 合理的,每次 retry 时读取需要重新判断 path 是否存在
if (!exists(path)) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public List<String> bucketKeys() {
if (bucketKeys.isEmpty()) {
bucketKeys = trimmedPrimaryKeys();
}
// askwang-todo:这里为什么把 fieldNames 判断去掉了?
return bucketKeys;
}

Expand Down
17 changes: 7 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ public static Stream<FileStatus> listVersionedFileStatus(FileIO fileIO, Path dir
.filter(status -> status.getPath().getName().startsWith(prefix));
}

/**
* 列出目录下的 FileStatus
*/
public static Stream<FileStatus> listVersionedFileStatusAskwang(FileIO fileIO, Path dir, String prefix) throws IOException {
/** 列出目录下的 FileStatus. */
public static Stream<FileStatus> listVersionedFileStatusAskwang(
FileIO fileIO, Path dir, String prefix) throws IOException {
if (!fileIO.exists(dir)) {
return Stream.empty();
}
Expand All @@ -131,15 +130,13 @@ public static Stream<FileStatus> listVersionedFileStatusAskwang(FileIO fileIO, P
FileStatus[] statuses = fileIO.listStatus(dir);
if (statuses == null) {
throw new RuntimeException(
String.format(
"The return value is null of the listStatus for the '%s' directory.",
dir));
String.format(
"The return value is null of the listStatus for the '%s' directory.",
dir));
}
return Arrays.stream(statuses)
.filter(name -> name.getPath().getName().startsWith(prefix));
return Arrays.stream(statuses).filter(name -> name.getPath().getName().startsWith(prefix));
}


/**
* List versioned directories for the directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.*;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -583,12 +589,11 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

/**
* 查找 EARLIEST snapshot id
* 1、snapshot dir path 不存在,return null
* 2、读取 snapshot dir 下 EARLIEST 文件内容 x,判断 snapshot-x 文件是否存在,存在则返回 EARLIEST 文件内的值
* 3、否则,扫描 snapshot dir 目录,去 snapshot-* 的最小值
* 查找 EARLIEST snapshot id. 1、snapshot dir path 不存在,return null 2、读取 snapshot dir 下 EARLIEST
* 文件内容 x,判断 snapshot-x 文件是否存在,存在则返回 EARLIEST 文件内的值 3、否则,扫描 snapshot dir 目录,去 snapshot-* 的最小值
*/
private @Nullable Long findEarliestAskwang(Path dir, String prefix, Function<Long, Path> file) throws IOException {
private @Nullable Long findEarliestAskwang(Path dir, String prefix, Function<Long, Path> file)
throws IOException {
if (!fileIO.exists(dir)) {
return null;
}
Expand All @@ -600,11 +605,9 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
return findByListFilesAskwang(Math::min, dir, prefix);
}

/**
* askwang-start
* 查找 LATEST snapshot id
*/
private Long findLatestAskwang(Path dir, String prefix, Function<Long, Path> file) throws IOException {
/** askwang-start. 查找 LATEST snapshot id。 */
private Long findLatestAskwang(Path dir, String prefix, Function<Long, Path> file)
throws IOException {
if (!fileIO.exists(dir)) {
return null;
}
Expand All @@ -619,34 +622,30 @@ private Long findLatestAskwang(Path dir, String prefix, Function<Long, Path> fil
return findByListFilesAskwang(Math::max, dir, prefix);
}

/**
* 列出 snapshot 目录下以 prefix 为前缀的文件的值,并根据函数 reducer 进行处理
*/
private @Nullable Long findByListFilesAskwang(BinaryOperator<Long> reducer, Path dir, String prefix) throws IOException {
return listVersionedFilesAskwang(fileIO, dir, prefix)
.reduce(reducer).orElse(null);
/** 列出 snapshot 目录下以 prefix 为前缀的文件的值,并根据函数 reducer 进行处理. */
private @Nullable Long findByListFilesAskwang(
BinaryOperator<Long> reducer, Path dir, String prefix) throws IOException {
return listVersionedFilesAskwang(fileIO, dir, prefix).reduce(reducer).orElse(null);
}

/**
* 列出指定目录下以 prefix 为前缀的文件的值
* listVersionedFiles(fileIO, snapshotDir/schemaDir, snapshotPrefix/schemaPrefix)
* <snapshot-1, snapshot-2, snapshot-3> => Stream<Long> (1,2,3)
* 这个函数的用途挺多:
* - 统计 snapshot 个数
* - snapshot id 及对应的 snapshot
* - schema id 及对应 schema
* 列出指定目录下以 prefix 为前缀的文件的值. listVersionedFiles(fileIO, snapshotDir/schemaDir,
* snapshotPrefix/schemaPrefix). snapshot-1, snapshot-2, snapshot-3 => Stream[Long] (1,2,3).
* 这个函数的用途挺多: - 统计 snapshot 个数 - snapshot id 及对应的 snapshot - schema id 及对应 schema
*/
private Stream<Long> listVersionedFilesAskwang(FileIO fileIO, Path dir, String prefix) throws IOException {
private Stream<Long> listVersionedFilesAskwang(FileIO fileIO, Path dir, String prefix)
throws IOException {
// askwang-done: listStatus 需考虑更多的情况,比如目录不存在、目录为空
FileStatus[] fileStatuses = fileIO.listStatus(dir);
// "EARLIEST".substring("snapshot-".length()); will throw StringIndexOutOfBoundsException, so filter first
// "EARLIEST".substring("snapshot-".length()); will throw StringIndexOutOfBoundsException,
// so filter first
return listVersionedFileStatusAskwang(fileIO, dir, prefix)
.map(FileStatus::getPath)
.map(Path::getName)
.map(name -> name.substring(prefix.length())).map(Long::parseLong);
.map(FileStatus::getPath)
.map(Path::getName)
.map(name -> name.substring(prefix.length()))
.map(Long::parseLong);
}


public Long readHint(String fileName) {
return readHint(fileName, snapshotDirectory());
}
Expand All @@ -670,12 +669,7 @@ public Long readHint(String fileName, Path dir) {
return null;
}

/**
* 思路:
* 1、读取的文件路径名 path
* 2、InputStream 读取对应 path
* 3、原方法 readHint 有两层重试机制,异常处理也更周到。
*/
/** 思路. 1、读取的文件路径名 path 2、InputStream 读取对应 path 3、原方法 readHint 有两层重试机制,异常处理也更周到。 */
public Long readHintAskwang(String fileName) throws IOException {
Path dir = snapshotDirectory();
StringBuilder sBuilder = new StringBuilder();
Expand All @@ -685,8 +679,9 @@ public Long readHintAskwang(String fileName) throws IOException {
}
int retryNum = 0;
while (retryNum++ <= READ_HINT_RETRY_NUM) {
try (InputStream in = fileIO.newInputStream(path)){
BufferedReader br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
try (InputStream in = fileIO.newInputStream(path)) {
BufferedReader br =
new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
sBuilder.append(line);
Expand Down Expand Up @@ -733,11 +728,7 @@ private void commitHint(long snapshotId, String fileName, Path dir) throws IOExc
fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
}

/**
* askwang-start
* snapshot dir; overwrite hdfs file;
*
*/
/** askwang-start. snapshot dir; overwrite hdfs file; */
public void commitEarliestHintAskwang(long snapshotId) throws IOException {
Path path = new Path(snapshotDirectory(), EARLIEST);
String content = String.valueOf(snapshotId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;

/** java use. */
public class AskwangJavaITCase {

@Test
public void testBinaryOperator() {
BinaryOperator<Integer> binaryOperator1 = (m, n) -> m + n;
BinaryOperator<Integer> binaryOperator2 = Integer::sum;
Integer sum = binaryOperator2.apply(3, 5);
assert(sum == 8);
}

@Test
public void testPathGetName() {
String SNAPSHOT_PREFIX = "snapshot-";
Path path = new Path("hdfs://ns/warehouse/db/tb/snapshot/snapshot-1");
String name = path.getName();
System.out.println(name);
String index = name.substring("snapshot-".length());
System.out.println(index);

Path path1 = new Path("hdfs://ns/warehouse/db/tb/snapshot/EARLIEST");
// path_length < begin_index => throw StringIndexOutOfBoundsException
String pos = path1.getName().substring(SNAPSHOT_PREFIX.length());
System.out.println(pos);
}

@Test
public void testAddCommitPartitionCache() {
Cache<BinaryRow, Boolean> cache = CacheBuilder.newBuilder()
.expireAfterAccess(30, TimeUnit.MINUTES)
.maximumSize(300)
.softValues()
.build();

// askwang-todo: partition transform to BinaryRow
// paimon-0.9: AddPartitionCommitCallback#addPartition. cache的使用
BinaryRow partition = null;
try {
Boolean added = cache.get(partition, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return false;
}
});
if (added) {
// return
}
// client.addPartition(partition);
cache.put(partition, true);
} catch (Exception e) {
throw new RuntimeException(e);
@Test
public void testBinaryOperator() {
BinaryOperator<Integer> binaryOperator1 = (m, n) -> m + n;
BinaryOperator<Integer> binaryOperator2 = Integer::sum;
Integer sum = binaryOperator2.apply(3, 5);
assert (sum == 8);
}
}

@Test
public void testPathGetName() {
Path path = new Path("hdfs://ns/warehouse/db/tb/snapshot/snapshot-1");
String name = path.getName();
System.out.println(name);
String index = name.substring("snapshot-".length());
System.out.println(index);

Path path1 = new Path("hdfs://ns/warehouse/db/tb/snapshot/EARLIEST");
// path_length < begin_index => throw StringIndexOutOfBoundsException
String pos = path1.getName().substring("snapshot-".length());
System.out.println(pos);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

class InsertOverwriteTestAskwang extends PaimonSparkTestBase {
Expand All @@ -23,14 +42,10 @@ class InsertOverwriteTestAskwang extends PaimonSparkTestBase {
|""".stripMargin)

spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)

spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')");
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

class InsertOverwriteTestAskwang extends PaimonSparkTestBase {
Expand All @@ -23,14 +42,10 @@ class InsertOverwriteTestAskwang extends PaimonSparkTestBase {
|""".stripMargin)

spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)

spark.sql("INSERT OVERWRITE T VALUES (1, 3, '3'), (2, 4, '4')");
checkAnswer(
spark.sql("SELECT * FROM T ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
}

}
Loading

0 comments on commit 04c3be2

Please sign in to comment.