Skip to content

Commit

Permalink
[core] Add _EXTERNAL_PATH in DataFileMeta
Browse files Browse the repository at this point in the history
This closes #4751
  • Loading branch information
neuyilan authored and JingsongLi committed Dec 23, 2024
1 parent 1fd81f2 commit 9ff5192
Show file tree
Hide file tree
Showing 17 changed files with 361 additions and 24 deletions.
50 changes: 36 additions & 14 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public class DataFileMeta {
new DataField(
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull()))));
DataTypes.ARRAY(DataTypes.STRING().notNull())),
new DataField(17, "_EXTERNAL_PATH", newStringType(true))));

public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW;
public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW;
Expand Down Expand Up @@ -120,6 +121,9 @@ public class DataFileMeta {

private final @Nullable List<String> valueStatsCols;

/** external path of file, if it is null, it is in the default warehouse path. */
private final @Nullable String externalPath;

public static DataFileMeta forAppend(
String fileName,
long fileSize,
Expand Down Expand Up @@ -149,7 +153,8 @@ public static DataFileMeta forAppend(
0L,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand Down Expand Up @@ -186,7 +191,8 @@ public DataFileMeta(
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand Down Expand Up @@ -222,7 +228,8 @@ public DataFileMeta(
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
null);
}

public DataFileMeta(
Expand All @@ -242,7 +249,8 @@ public DataFileMeta(
@Nullable Long deleteRowCount,
@Nullable byte[] embeddedIndex,
@Nullable FileSource fileSource,
@Nullable List<String> valueStatsCols) {
@Nullable List<String> valueStatsCols,
@Nullable String externalPath) {
this.fileName = fileName;
this.fileSize = fileSize;

Expand All @@ -264,6 +272,7 @@ public DataFileMeta(
this.deleteRowCount = deleteRowCount;
this.fileSource = fileSource;
this.valueStatsCols = valueStatsCols;
this.externalPath = externalPath;
}

public String fileName() {
Expand Down Expand Up @@ -357,6 +366,11 @@ public String fileFormat() {
return split[split.length - 1];
}

@Nullable
public String externalPath() {
return externalPath;
}

public Optional<FileSource> fileSource() {
return Optional.ofNullable(fileSource);
}
Expand Down Expand Up @@ -385,7 +399,8 @@ public DataFileMeta upgrade(int newLevel) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

public DataFileMeta rename(String newFileName) {
Expand All @@ -406,7 +421,8 @@ public DataFileMeta rename(String newFileName) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

public DataFileMeta copyWithoutStats() {
Expand All @@ -427,7 +443,8 @@ public DataFileMeta copyWithoutStats() {
deleteRowCount,
embeddedIndex,
fileSource,
Collections.emptyList());
Collections.emptyList(),
externalPath);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
Expand Down Expand Up @@ -455,7 +472,8 @@ public DataFileMeta copy(List<String> newExtraFiles) {
deleteRowCount,
embeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

public DataFileMeta copy(byte[] newEmbeddedIndex) {
Expand All @@ -476,7 +494,8 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) {
deleteRowCount,
newEmbeddedIndex,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

@Override
Expand Down Expand Up @@ -504,7 +523,8 @@ public boolean equals(Object o) {
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount)
&& Objects.equals(fileSource, that.fileSource)
&& Objects.equals(valueStatsCols, that.valueStatsCols);
&& Objects.equals(valueStatsCols, that.valueStatsCols)
&& Objects.equals(externalPath, that.externalPath);
}

@Override
Expand All @@ -526,7 +546,8 @@ public int hashCode() {
creationTime,
deleteRowCount,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

@Override
Expand All @@ -536,7 +557,7 @@ public String toString() {
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, "
+ "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s}",
+ "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, externalPath: %s}",
fileName,
fileSize,
rowCount,
Expand All @@ -553,7 +574,8 @@ public String toString() {
creationTime,
deleteRowCount,
fileSource,
valueStatsCols);
valueStatsCols,
externalPath);
}

public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)),
null,
null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.safe.SafeBinaryRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;

/** Serializer for {@link DataFileMeta} with 0.9 version. */
public class DataFileMeta10LegacySerializer implements Serializable {

private static final long serialVersionUID = 1L;

public static final RowType SCHEMA =
new RowType(
false,
Arrays.asList(
new DataField(0, "_FILE_NAME", newStringType(false)),
new DataField(1, "_FILE_SIZE", new BigIntType(false)),
new DataField(2, "_ROW_COUNT", new BigIntType(false)),
new DataField(3, "_MIN_KEY", newBytesType(false)),
new DataField(4, "_MAX_KEY", newBytesType(false)),
new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA),
new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA),
new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)),
new DataField(9, "_SCHEMA_ID", new BigIntType(false)),
new DataField(10, "_LEVEL", new IntType(false)),
new DataField(
11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))),
new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()),
new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)),
new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)),
new DataField(15, "_FILE_SOURCE", new TinyIntType(true)),
new DataField(
16,
"_VALUE_STATS_COLS",
DataTypes.ARRAY(DataTypes.STRING().notNull()))));

protected final InternalRowSerializer rowSerializer;

public DataFileMeta10LegacySerializer() {
this.rowSerializer = InternalSerializers.create(SCHEMA);
}

public final void serializeList(List<DataFileMeta> records, DataOutputView target)
throws IOException {
target.writeInt(records.size());
for (DataFileMeta t : records) {
serialize(t, target);
}
}

public void serialize(DataFileMeta meta, DataOutputView target) throws IOException {
GenericRow row =
GenericRow.of(
BinaryString.fromString(meta.fileName()),
meta.fileSize(),
meta.rowCount(),
serializeBinaryRow(meta.minKey()),
serializeBinaryRow(meta.maxKey()),
meta.keyStats().toRow(),
meta.valueStats().toRow(),
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime(),
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()));
rowSerializer.serialize(row, target);
}

public final List<DataFileMeta> deserializeList(DataInputView source) throws IOException {
int size = source.readInt();
List<DataFileMeta> records = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
records.add(deserialize(source));
}
return records;
}

public DataFileMeta deserialize(DataInputView in) throws IOException {
byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0);
return new DataFileMeta(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
deserializeBinaryRow(row.getBinary(3)),
deserializeBinaryRow(row.getBinary(4)),
SimpleStats.fromRow(row.getRow(5, 3)),
SimpleStats.fromRow(row.getRow(6, 3)),
row.getLong(7),
row.getLong(8),
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public InternalRow toRow(DataFileMeta meta) {
meta.deleteRowCount().orElse(null),
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()));
toStringArrayData(meta.valueStatsCols()),
BinaryString.fromString(meta.externalPath()));
}

@Override
Expand All @@ -80,6 +81,7 @@ public DataFileMeta fromRow(InternalRow row) {
row.isNullAt(13) ? null : row.getLong(13),
row.isNullAt(14) ? null : row.getBinary(14),
row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)),
row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)));
row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)),
row.isNullAt(17) ? null : row.getString(17).toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public DataFileMeta fromRow(InternalRow row) {
null,
null,
null,
null,
null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
import org.apache.paimon.io.DataFileMeta10LegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
Expand All @@ -47,11 +48,12 @@
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements VersionedSerializer<CommitMessage> {

private static final int CURRENT_VERSION = 5;
private static final int CURRENT_VERSION = 6;

private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;

private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
private DataFileMeta09Serializer dataFile09Serializer;
private DataFileMeta08Serializer dataFile08Serializer;
private IndexFileMeta09Serializer indexEntry09Serializer;
Expand Down Expand Up @@ -129,8 +131,13 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce

private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
int version, DataInputView view) {
if (version >= 4) {
if (version >= 5) {
return () -> dataFileSerializer.deserializeList(view);
} else if (version == 4) {
if (dataFileMeta10LegacySerializer == null) {
dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer();
}
return () -> dataFileMeta10LegacySerializer.deserializeList(view);
} else if (version == 3) {
if (dataFile09Serializer == null) {
dataFile09Serializer = new DataFileMeta09Serializer();
Expand Down
Loading

0 comments on commit 9ff5192

Please sign in to comment.