Skip to content

Commit

Permalink
[core] Support read external path in DataFileMeta (#4761)
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan authored Dec 24, 2024
1 parent 718680f commit 0ae0c02
Show file tree
Hide file tree
Showing 32 changed files with 176 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void close() throws Exception {
for (DataFileMeta file : compactAfter) {
// appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we
// can directly delete the file in compactAfter.
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file));
}

sinkWriter.close();
Expand All @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception {
} finally {
// remove small files
for (DataFileMeta file : files) {
fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
fileIO.deleteQuietly(pathFactory.toPath(file));
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public static DataFileMeta forAppend(
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
return new DataFileMeta(
fileName,
fileSize,
Expand All @@ -154,7 +155,7 @@ public static DataFileMeta forAppend(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand All @@ -173,7 +174,8 @@ public DataFileMeta(
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
this(
fileName,
fileSize,
Expand All @@ -192,7 +194,7 @@ public DataFileMeta(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand Down Expand Up @@ -403,7 +405,7 @@ public DataFileMeta upgrade(int newLevel) {
externalPath);
}

public DataFileMeta rename(String newFileName) {
public DataFileMeta rename(String newExternalPath, String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
Expand All @@ -422,7 +424,7 @@ public DataFileMeta rename(String newFileName) {
embeddedIndex,
fileSource,
valueStatsCols,
externalPath);
newExternalPath);
}

public DataFileMeta copyWithoutStats() {
Expand All @@ -449,8 +451,8 @@ public DataFileMeta copyWithoutStats() {

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
extraFiles.forEach(f -> paths.add(pathFactory.toPath(f)));
paths.add(pathFactory.toPath(this));
extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f)));
return paths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;

/** Serializer for {@link DataFileMeta} with 0.9 version. */
/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */
public class DataFileMeta10LegacySerializer implements Serializable {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,33 @@ private Path newPath(String prefix) {
return new Path(parent, name);
}

@VisibleForTesting
public Path toPath(String fileName) {
return new Path(parent + "/" + fileName);
}

/**
* for read purpose.
*
* @param fileName the file name
* @param externalPath the external path, if null, it will use the parent path
* @return the file's path
*/
public Path toPath(String fileName, String externalPath) {
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
}

public Path toPath(DataFileMeta dataFileMeta) {
String externalPath = dataFileMeta.externalPath();
String fileName = dataFileMeta.fileName();
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
}

public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
String externalPath = dataFileMeta.externalPath();
return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile);
}

@VisibleForTesting
public String uuid() {
return uuid;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static FileIndexResult evaluate(
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toPath(indexFiles.get(0)),
dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ public DataFileMeta result() throws IOException {
deleteRecordCount,
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
valueStatsPair.getKey(),
null);
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
Expand Down Expand Up @@ -97,16 +98,25 @@ private KeyValueFileReaderFactory(

@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {
return createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level());
return createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level(),
file.externalPath());
}

@VisibleForTesting
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level) throws IOException {
long schemaId, String fileName, long fileSize, int level, String externalPath)
throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() -> createRecordReader(schemaId, fileName, level, false, 2, fileSize));
() ->
createRecordReader(
schemaId, fileName, level, false, 2, fileSize, externalPath));
}
return createRecordReader(schemaId, fileName, level, true, null, fileSize);
return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath);
}

private FileRecordReader<KeyValue> createRecordReader(
Expand All @@ -115,7 +125,8 @@ private FileRecordReader<KeyValue> createRecordReader(
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
long fileSize)
long fileSize,
String externalPath)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);

Expand All @@ -132,7 +143,7 @@ private FileRecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName);
Path filePath = pathFactory.toPath(fileName, externalPath);

FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ private KeyValueDataFileWriter createDataFileWriter(
fileIndexOptions);
}

public void deleteFile(String filename, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
public void deleteFile(DataFileMeta meta, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
}

public void copyFile(String sourceFileName, String targetFileName, int level)
public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName);
Path targetPath = formatContext.pathFactory(level).toPath(targetFileName);
Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
fileIO.copyFile(sourcePath, targetPath, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public DataFileMeta result() throws IOException {
: Collections.singletonList(indexResult.independentIndexFile()),
indexResult.embeddedIndexBytes(),
fileSource,
statsPair.getKey());
statsPair.getKey(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,19 @@ public ExpireFileEntry(
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey,
@Nullable FileSource fileSource) {
super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey);
@Nullable FileSource fileSource,
@Nullable String externalPath) {
super(
kind,
partition,
bucket,
level,
fileName,
extraFiles,
embeddedIndex,
minKey,
maxKey,
externalPath);
this.fileSource = fileSource;
}

Expand All @@ -61,7 +72,8 @@ public static ExpireFileEntry from(ManifestEntry entry) {
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey(),
entry.file().fileSource().orElse(null));
entry.file().fileSource().orElse(null),
entry.externalPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public interface FileEntry {

String fileName();

String externalPath();

Identifier identifier();

BinaryRow minKey();
Expand All @@ -73,6 +75,7 @@ class Identifier {
public final String fileName;
public final List<String> extraFiles;
@Nullable private final byte[] embeddedIndex;
@Nullable public final String externalPath;

/* Cache the hash code for the string */
private Integer hash;
Expand All @@ -83,13 +86,15 @@ public Identifier(
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex) {
@Nullable byte[] embeddedIndex,
@Nullable String externalPath) {
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
this.embeddedIndex = embeddedIndex;
this.externalPath = externalPath;
}

@Override
Expand All @@ -106,7 +111,8 @@ public boolean equals(Object o) {
&& Objects.equals(partition, that.partition)
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.deepEquals(embeddedIndex, that.embeddedIndex);
&& Objects.deepEquals(embeddedIndex, that.embeddedIndex)
&& Objects.deepEquals(externalPath, that.externalPath);
}

@Override
Expand All @@ -119,7 +125,8 @@ public int hashCode() {
level,
fileName,
extraFiles,
Arrays.hashCode(embeddedIndex));
Arrays.hashCode(embeddedIndex),
externalPath);
}
return hash;
}
Expand All @@ -138,6 +145,8 @@ public String toString() {
+ extraFiles
+ ", embeddedIndex="
+ Arrays.toString(embeddedIndex)
+ ", externalPath="
+ externalPath
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public String fileName() {
return file.fileName();
}

@Override
public String externalPath() {
return file.externalPath();
}

@Override
public BinaryRow minKey() {
return file.minKey();
Expand Down Expand Up @@ -123,7 +128,8 @@ public Identifier identifier() {
file.level(),
file.fileName(),
file.extraFiles(),
file.embeddedIndex());
file.embeddedIndex(),
file.externalPath());
}

public ManifestEntry copyWithoutStats() {
Expand Down
Loading

0 comments on commit 0ae0c02

Please sign in to comment.