Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core]Support read external path in DataFileMeta #4761

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void close() throws Exception {
for (DataFileMeta file : compactAfter) {
// appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we
// can directly delete the file in compactAfter.
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file));
}

sinkWriter.close();
Expand All @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception {
} finally {
// remove small files
for (DataFileMeta file : files) {
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file));
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public static DataFileMeta forAppend(
List<String> extraFiles,
neuyilan marked this conversation as resolved.
Show resolved Hide resolved
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
return new DataFileMeta(
fileName,
fileSize,
Expand All @@ -154,7 +155,7 @@ public static DataFileMeta forAppend(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand All @@ -173,7 +174,8 @@ public DataFileMeta(
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
this(
fileName,
fileSize,
Expand All @@ -192,7 +194,7 @@ public DataFileMeta(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand Down Expand Up @@ -403,7 +405,7 @@ public DataFileMeta upgrade(int newLevel) {
externalPath);
}

public DataFileMeta rename(String newFileName) {
public DataFileMeta rename(String newExternalPath, String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
Expand All @@ -422,7 +424,7 @@ public DataFileMeta rename(String newFileName) {
embeddedIndex,
fileSource,
valueStatsCols,
externalPath);
newExternalPath);
}

public DataFileMeta copyWithoutStats() {
Expand All @@ -449,8 +451,8 @@ public DataFileMeta copyWithoutStats() {

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
extraFiles.forEach(f -> paths.add(pathFactory.toPath(f)));
paths.add(pathFactory.toPath(this));
extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f)));
return paths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;

/** Serializer for {@link DataFileMeta} with 0.9 version. */
/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
public class DataFileMeta10LegacySerializer implements Serializable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,39 @@ private Path newPath(String prefix) {
return new Path(parent, name);
}

@VisibleForTesting
public Path toPath(String fileName) {
return new Path(parent + "/" + fileName);
}

/**
* for read purpose.
*
* @param fileName the file name
* @param externalPath the external path, if null, it will use the parent path
* @return the file's path
*/
public Path toPath(String fileName, String externalPath) {
neuyilan marked this conversation as resolved.
Show resolved Hide resolved
if (externalPath == null) {
return new Path(parent + "/" + fileName);
}
return new Path(externalPath + "/" + fileName);
}

public Path toPath(DataFileMeta dataFileMeta) {
neuyilan marked this conversation as resolved.
Show resolved Hide resolved
if (dataFileMeta.externalPath() == null) {
return new Path(parent + "/" + dataFileMeta.fileName());
}
return new Path(dataFileMeta.externalPath() + "/" + dataFileMeta.fileName());
}

public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
if (dataFileMeta.externalPath() == null) {
return new Path(parent + "/" + extraFile);
}
return new Path(dataFileMeta.externalPath() + "/" + extraFile);
}

@VisibleForTesting
public String uuid() {
return uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static FileIndexResult evaluate(
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ public DataFileMeta result() throws IOException {
deleteRecordCount,
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
valueStatsPair.getKey(),
null);
neuyilan marked this conversation as resolved.
Show resolved Hide resolved
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
Expand Down Expand Up @@ -97,16 +98,25 @@ private KeyValueFileReaderFactory(

@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {
return createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level());
return createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level(),
file.externalPath());
}

@VisibleForTesting
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
long schemaId, String fileName, long fileSize, int level, String externalPath)
throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
() ->
createRecordReader(
schemaId, fileName, level, false, 2, fileSize, externalPath));
}
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath);
}

private FileRecordReader<KeyValue> createRecordReader(
Expand All @@ -115,7 +125,8 @@ private FileRecordReader<KeyValue> createRecordReader(
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
long fileSize)
long fileSize,
String externalPath)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Expand All @@ -132,7 +143,7 @@ private FileRecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName);
Path filePath = pathFactory.toPath(fileName, externalPath);

FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ private KeyValueDataFileWriter createDataFileWriter(
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
public void deleteFile(DataFileMeta meta, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
}

public void copyFile(String sourceFileName, String targetFileName, int level)
public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName);
Path targetPath = formatContext.pathFactory(level).toPath(targetFileName);
Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
fileIO.copyFile(sourcePath, targetPath, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public DataFileMeta result() throws IOException {
: Collections.singletonList(indexResult.independentIndexFile()),
indexResult.embeddedIndexBytes(),
fileSource,
statsPair.getKey());
statsPair.getKey(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,19 @@ public ExpireFileEntry(
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
@Nullable FileSource fileSource) {
super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey);
@Nullable FileSource fileSource,
@Nullable String externalPath) {
super(
kind,
partition,
bucket,
level,
fileName,
extraFiles,
embeddedIndex,
minKey,
maxKey,
externalPath);
this.fileSource = fileSource;
}

Expand All @@ -61,7 +72,8 @@ public static ExpireFileEntry from(ManifestEntry entry) {
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
entry.file().fileSource().orElse(null));
entry.file().fileSource().orElse(null),
entry.externalPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public interface FileEntry {

String fileName();

String externalPath();
neuyilan marked this conversation as resolved.
Show resolved Hide resolved

Identifier identifier();

BinaryRow minKey();
Expand All @@ -73,6 +75,7 @@ class Identifier {
public final String fileName;
public final List<String> extraFiles;
@Nullable private final byte[] embeddedIndex;
@Nullable public final String externalPath;

/* Cache the hash code for the string */
private Integer hash;
Expand All @@ -83,13 +86,15 @@ public Identifier(
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex) {
@Nullable byte[] embeddedIndex,
@Nullable String externalPath) {
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
this.embeddedIndex = embeddedIndex;
this.externalPath = externalPath;
}

@Override
Expand All @@ -106,7 +111,8 @@ public boolean equals(Object o) {
&& Objects.equals(partition, that.partition)
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.deepEquals(embeddedIndex, that.embeddedIndex);
&& Objects.deepEquals(embeddedIndex, that.embeddedIndex)
&& Objects.deepEquals(externalPath, that.externalPath);
}

@Override
Expand All @@ -119,7 +125,8 @@ public int hashCode() {
level,
fileName,
extraFiles,
Arrays.hashCode(embeddedIndex));
Arrays.hashCode(embeddedIndex),
externalPath);
}
return hash;
}
Expand All @@ -138,6 +145,8 @@ public String toString() {
+ extraFiles
+ ", embeddedIndex="
+ Arrays.toString(embeddedIndex)
+ ", externalPath="
+ externalPath
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public String fileName() {
return file.fileName();
}

@Override
public String externalPath() {
return file.externalPath();
}

@Override
public BinaryRow minKey() {
return file.minKey();
Expand Down Expand Up @@ -123,7 +128,8 @@ public Identifier identifier() {
file.level(),
file.fileName(),
file.extraFiles(),
file.embeddedIndex());
file.embeddedIndex(),
file.externalPath());
}

public ManifestEntry copyWithoutStats() {
Expand Down
Loading
Loading