Skip to content

Commit

Permalink
[core] External Path in DataFileMeta should be the file path (#4766)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Dec 24, 2024
1 parent ea176b9 commit 1624678
Show file tree
Hide file tree
Showing 29 changed files with 157 additions and 183 deletions.
16 changes: 11 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,14 @@ public String fileFormat() {
return split[split.length - 1];
}

@Nullable
public String externalPath() {
return externalPath;
public Optional<String> externalPath() {
return Optional.ofNullable(externalPath);
}

public Optional<String> externalPathDir() {
return Optional.ofNullable(externalPath)
.map(Path::new)
.map(p -> p.getParent().toUri().toString());
}

public Optional<FileSource> fileSource() {
Expand Down Expand Up @@ -405,7 +410,8 @@ public DataFileMeta upgrade(int newLevel) {
externalPath);
}

public DataFileMeta rename(String newExternalPath, String newFileName) {
public DataFileMeta rename(String newFileName) {
String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null);
return new DataFileMeta(
newFileName,
fileSize,
Expand Down Expand Up @@ -452,7 +458,7 @@ public DataFileMeta copyWithoutStats() {
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f)));
extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
return paths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InternalRow toRow(DataFileMeta meta) {
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()),
BinaryString.fromString(meta.externalPath()));
meta.externalPath().map(BinaryString::fromString).orElse(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileEntry;

import javax.annotation.concurrent.ThreadSafe;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -67,47 +69,36 @@ public Path newChangelogPath() {
return newPath(changelogFilePrefix);
}

private Path newPath(String prefix) {
public String newChangelogFileName() {
return newFileName(changelogFilePrefix);
}

public Path newPath(String prefix) {
return new Path(parent, newFileName(prefix));
}

private String newFileName(String prefix) {
String extension;
if (fileSuffixIncludeCompression) {
extension = "." + fileCompression + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
return new Path(parent, name);
return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
}

@VisibleForTesting
public Path toPath(String fileName) {
return new Path(parent + "/" + fileName);
}

/**
* for read purpose.
*
* @param fileName the file name
* @param externalPath the external path, if null, it will use the parent path
* @return the file's path
*/
public Path toPath(String fileName, String externalPath) {
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
public Path toPath(DataFileMeta file) {
return file.externalPath().map(Path::new).orElse(new Path(parent, file.fileName()));
}

public Path toPath(DataFileMeta dataFileMeta) {
String externalPath = dataFileMeta.externalPath();
String fileName = dataFileMeta.fileName();
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
public Path toPath(FileEntry file) {
return Optional.ofNullable(file.externalPath())
.map(Path::new)
.orElse(new Path(parent, file.fileName()));
}

public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
String externalPath = dataFileMeta.externalPath();
return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile);
}

@VisibleForTesting
public String uuid() {
return uuid;
public Path toAlignedPath(String fileName, DataFileMeta aligned) {
return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
}

public static Path dataFileToFileIndexPath(Path dataFilePath) {
Expand Down Expand Up @@ -141,4 +132,9 @@ public static String formatIdentifier(String fileName) {

return fileName.substring(index + 1);
}

@VisibleForTesting
String uuid() {
return uuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static FileIndexResult evaluate(
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)),
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
Expand Down Expand Up @@ -98,37 +97,17 @@ private KeyValueFileReaderFactory(

@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {
return createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level(),
file.externalPath());
}

@VisibleForTesting
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level, String externalPath)
throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() ->
createRecordReader(
schemaId, fileName, level, false, 2, fileSize, externalPath));
if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) {
return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2));
}
return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath);
return createRecordReader(file, true, null);
}

private FileRecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
long fileSize,
String externalPath)
DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
long schemaId = file.schemaId();

Supplier<FormatReaderMapping> formatSupplier =
() ->
Expand All @@ -143,8 +122,9 @@ private FileRecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName, externalPath);
Path filePath = pathFactory.toPath(file);

long fileSize = file.fileSize();
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
Expand All @@ -156,13 +136,13 @@ private FileRecordReader<KeyValue> createRecordReader(
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
Optional<DeletionVector> deletionVector = dvFactory.create(file.fileName());
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
fileRecordReader =
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,22 @@ private KeyValueDataFileWriter createDataFileWriter(
fileIndexOptions);
}

public void deleteFile(DataFileMeta meta, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
public void deleteFile(DataFileMeta file) {
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
}

public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) throws IOException {
Path sourcePath = formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
Path targetPath = formatContext.pathFactory(targetFile.level()).toPath(targetFile);
fileIO.copyFile(sourcePath, targetPath, true);
}

public FileIO getFileIO() {
return fileIO;
}

public Path newChangelogPath(int level) {
return formatContext.pathFactory(level).newChangelogPath();
public String newChangelogFileName(int level) {
return formatContext.pathFactory(level).newChangelogFileName();
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public interface FileEntry {

String fileName();

@Nullable
String externalPath();

Identifier identifier();
Expand Down Expand Up @@ -161,7 +162,9 @@ public String toString(FileStorePathFactory pathFactory) {
+ ", extraFiles "
+ extraFiles
+ ", embeddedIndex "
+ Arrays.toString(embeddedIndex);
+ Arrays.toString(embeddedIndex)
+ ", externalPath "
+ externalPath;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -92,9 +94,10 @@ public String fileName() {
return file.fileName();
}

@Nullable
@Override
public String externalPath() {
return file.externalPath();
return file.externalPath().orElse(null);
}

@Override
Expand Down Expand Up @@ -129,7 +132,7 @@ public Identifier identifier() {
file.fileName(),
file.extraFiles(),
file.embeddedIndex(),
file.externalPath());
externalPath());
}

public ManifestEntry copyWithoutStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public String fileName() {
return fileName;
}

@Nullable
@Override
public String externalPath() {
return externalPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
Expand Down Expand Up @@ -242,10 +241,9 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
} else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) {
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
Path newPath = writerFactory.newChangelogPath(0);
DataFileMeta changelogMeta =
dataMeta.rename(newPath.getParent().getName(), newPath.getName());
writerFactory.copyFile(dataMeta, changelogMeta, 0);
String newFileName = writerFactory.newChangelogFileName(0);
DataFileMeta changelogMeta = dataMeta.rename(newFileName);
writerFactory.copyFile(dataMeta, changelogMeta);
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
Expand Down Expand Up @@ -343,7 +341,7 @@ private void updateCompactResult(CompactResult result) {
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
} else {
compactBefore.put(file.fileName(), file);
Expand Down Expand Up @@ -377,7 +375,7 @@ public void close() throws Exception {
deletedFiles.clear();

for (DataFileMeta file : newFilesChangelog) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
newFilesChangelog.clear();

Expand All @@ -392,12 +390,12 @@ public void close() throws Exception {
compactAfter.clear();

for (DataFileMeta file : compactChangelog) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
compactChangelog.clear();

for (DataFileMeta file : delete) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}

if (compactDeletionFile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ public Optional<List<RawFile>> convertToRawFiles() {
}

private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
String path = file.externalPath() != null ? file.externalPath() : bucketPath;
path += "/" + file.fileName();
return new RawFile(
path,
file.externalPath().orElse(bucketPath + "/" + file.fileName()),
file.fileSize(),
0,
file.fileSize(),
Expand Down
Loading

0 comments on commit 1624678

Please sign in to comment.