Skip to content

Commit

Permalink
new writen files will be persisted with externalPath propertity
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan committed Dec 23, 2024
1 parent 82cda96 commit f769ada
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 10 deletions.
32 changes: 28 additions & 4 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public static DataFileMeta forAppend(
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
return new DataFileMeta(
fileName,
fileSize,
Expand All @@ -154,7 +155,7 @@ public static DataFileMeta forAppend(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand All @@ -173,7 +174,8 @@ public DataFileMeta(
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
this(
fileName,
fileSize,
Expand All @@ -192,7 +194,7 @@ public DataFileMeta(
embeddedIndex,
fileSource,
valueStatsCols,
null);
externalPath);
}

public DataFileMeta(
Expand Down Expand Up @@ -498,6 +500,28 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) {
externalPath);
}

public DataFileMeta copy(String externalPath) {
return new DataFileMeta(
fileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols,
externalPath);
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ public DataFileMeta result() throws IOException {
deleteRecordCount,
indexResult.embeddedIndexBytes(),
fileSource,
valueStatsPair.getKey());
valueStatsPair.getKey(),
path.getParent().toString());
}

abstract Pair<SimpleColStats[], SimpleColStats[]> fetchKeyValueStats(SimpleColStats[] rowStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public DataFileMeta result() throws IOException {
: Collections.singletonList(indexResult.independentIndexFile()),
indexResult.embeddedIndexBytes(),
fileSource,
statsPair.getKey());
statsPair.getKey(),
path.getParent().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ private static DataFileMeta constructFileMeta(
newPath,
simpleStatsExtractor,
fileIO,
table);
table,
newPath.getParent().toString());
} catch (IOException e) {
throw new RuntimeException("error when construct file meta", e);
}
Expand All @@ -150,7 +151,8 @@ private static DataFileMeta constructFileMeta(
Path path,
SimpleStatsExtractor simpleStatsExtractor,
FileIO fileIO,
Table table)
Table table,
String externalPath)
throws IOException {
SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType());

Expand All @@ -169,7 +171,8 @@ private static DataFileMeta constructFileMeta(
Collections.emptyList(),
null,
FileSource.APPEND,
null);
null,
externalPath);
}

public static BinaryRow writePartitionValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) throws I
Collections.emptyList(),
null,
FileSource.APPEND,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public static class Data {
public final DataFileMeta meta;
public final List<KeyValue> content;

private Data(BinaryRow partition, int bucket, DataFileMeta meta, List<KeyValue> content) {
public Data(BinaryRow partition, int bucket, DataFileMeta meta, List<KeyValue> content) {
this.partition = partition;
this.bucket = bucket;
this.meta = meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
Expand Down Expand Up @@ -96,6 +97,7 @@ public void testWriteAndReadDataFileWithFileExtractingRollingFile() throws Excep
private void testWriteAndReadDataFileImpl(String format) throws Exception {
DataFileTestDataGenerator.Data data = gen.next();
KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format);

DataFileMetaSerializer serializer = new DataFileMetaSerializer();

RollingFileWriter<KeyValue, DataFileMeta> writer =
Expand Down Expand Up @@ -381,6 +383,11 @@ private void checkRollingFiles(
for (DataFileMeta meta : actual) {
assertThat(meta.level()).isEqualTo(expected.level());
}

// assert actual externalPath is not null
for (DataFileMeta meta : actual) {
assertThat(Objects.requireNonNull(meta.externalPath()));
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ private DataFileMeta newDataFile(long rowCount) {
Collections.emptyList(),
null,
null,
null,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
new java.util.ArrayList[String](),
null,
FileSource.APPEND,
null,
null)
}

Expand Down Expand Up @@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase {
new java.util.ArrayList[String](),
null,
FileSource.APPEND,
null,
null)
).asJava

Expand Down

0 comments on commit f769ada

Please sign in to comment.