diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index bb9e45ff002d..bb4a2eed7223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -82,7 +82,8 @@ public class DataFileMeta { new DataField( 16, "_VALUE_STATS_COLS", - DataTypes.ARRAY(DataTypes.STRING().notNull())))); + DataTypes.ARRAY(DataTypes.STRING().notNull())), + new DataField(17, "_DATA_ROOT_LOCATION", newStringType(true)))); public static final BinaryRow EMPTY_MIN_KEY = EMPTY_ROW; public static final BinaryRow EMPTY_MAX_KEY = EMPTY_ROW; @@ -120,6 +121,12 @@ public class DataFileMeta { private final @Nullable List valueStatsCols; + /** + * the external that the file resides in, if it is null, the file is in the default warehouse + * path. + */ + private final @Nullable String externalPath; + public static DataFileMeta forAppend( String fileName, long fileSize, @@ -149,7 +156,8 @@ public static DataFileMeta forAppend( 0L, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -186,7 +194,8 @@ public DataFileMeta( deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -222,7 +231,8 @@ public DataFileMeta( deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + null); } public DataFileMeta( @@ -242,7 +252,8 @@ public DataFileMeta( @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String externalPath) { this.fileName = fileName; this.fileSize = fileSize; @@ -264,6 +275,7 @@ public DataFileMeta( this.deleteRowCount = deleteRowCount; this.fileSource = fileSource; this.valueStatsCols = valueStatsCols; + this.externalPath = externalPath; } public String fileName() { @@ -357,6 +369,11 @@ public String fileFormat() { return split[split.length - 1]; } + @Nullable + public String getExternalPath() { + return externalPath; + } + public Optional fileSource() { return Optional.ofNullable(fileSource); } @@ -385,7 +402,8 @@ public DataFileMeta upgrade(int newLevel) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } public DataFileMeta rename(String newFileName) { @@ -406,7 +424,8 @@ public DataFileMeta rename(String newFileName) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } public DataFileMeta copyWithoutStats() { @@ -427,7 +446,8 @@ public DataFileMeta copyWithoutStats() { deleteRowCount, embeddedIndex, fileSource, - Collections.emptyList()); + Collections.emptyList(), + externalPath); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -455,7 +475,8 @@ public DataFileMeta copy(List newExtraFiles) { deleteRowCount, embeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } public DataFileMeta copy(byte[] newEmbeddedIndex) { @@ -476,7 +497,8 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { deleteRowCount, newEmbeddedIndex, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } @Override @@ -504,7 +526,8 @@ public boolean equals(Object o) { && Objects.equals(creationTime, that.creationTime) && Objects.equals(deleteRowCount, that.deleteRowCount) && Objects.equals(fileSource, that.fileSource) - && Objects.equals(valueStatsCols, that.valueStatsCols); + && Objects.equals(valueStatsCols, that.valueStatsCols) + && Objects.equals(externalPath, that.externalPath); } @Override @@ -526,7 +549,8 @@ public int hashCode() { creationTime, deleteRowCount, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } @Override @@ -536,7 +560,7 @@ public String toString() { + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " - + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s}", + + "deleteRowCount: %d, fileSource: %s, valueStatsCols: %s, externalPath: %s}", fileName, fileSize, rowCount, @@ -553,7 +577,8 @@ public String toString() { creationTime, deleteRowCount, fileSource, - valueStatsCols); + valueStatsCols, + externalPath); } public static long getMaxSequenceNumber(List fileMetas) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java index 03e4ed51f4be..e6c10f15342b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java @@ -133,6 +133,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), null, + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java index 2f8d89f5b1ab..36d1ad260fc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta09Serializer.java @@ -139,6 +139,7 @@ public DataFileMeta deserialize(DataInputView in) throws IOException { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java new file mode 100644 index 000000000000..68ccba6ea31c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.safe.SafeBinaryRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.manifest.FileSource; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta} with 0.9 version. */ +public class DataFileMeta10LegacySerializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_FILE_NAME", newStringType(false)), + new DataField(1, "_FILE_SIZE", new BigIntType(false)), + new DataField(2, "_ROW_COUNT", new BigIntType(false)), + new DataField(3, "_MIN_KEY", newBytesType(false)), + new DataField(4, "_MAX_KEY", newBytesType(false)), + new DataField(5, "_KEY_STATS", SimpleStats.SCHEMA), + new DataField(6, "_VALUE_STATS", SimpleStats.SCHEMA), + new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)), + new DataField(9, "_SCHEMA_ID", new BigIntType(false)), + new DataField(10, "_LEVEL", new IntType(false)), + new DataField( + 11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))), + new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()), + new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)), + new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true)), + new DataField(15, "_FILE_SOURCE", new TinyIntType(true)), + new DataField( + 16, + "_VALUE_STATS_COLS", + DataTypes.ARRAY(DataTypes.STRING().notNull())))); + + protected final InternalRowSerializer rowSerializer; + + public DataFileMeta10LegacySerializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public final void serializeList(List records, DataOutputView target) + throws IOException { + target.writeInt(records.size()); + for (DataFileMeta t : records) { + serialize(t, target); + } + } + + public void serialize(DataFileMeta meta, DataOutputView target) throws IOException { + GenericRow row = + GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null), + toStringArrayData(meta.valueStatsCols())); + rowSerializer.serialize(row, target); + } + + public final List deserializeList(DataInputView source) throws IOException { + int size = source.readInt(); + List records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + records.add(deserialize(source)); + } + return records; + } + + public DataFileMeta deserialize(DataInputView in) throws IOException { + byte[] bytes = new byte[in.readInt()]; + in.readFully(bytes); + SafeBinaryRow row = new SafeBinaryRow(rowSerializer.getArity(), bytes, 0); + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 626201ca30ce..2d109d96d7a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -58,7 +58,8 @@ public InternalRow toRow(DataFileMeta meta) { meta.deleteRowCount().orElse(null), meta.embeddedIndex(), meta.fileSource().map(FileSource::toByteValue).orElse(null), - toStringArrayData(meta.valueStatsCols())); + toStringArrayData(meta.valueStatsCols()), + BinaryString.fromString(meta.getExternalPath())); } @Override @@ -80,6 +81,7 @@ public DataFileMeta fromRow(InternalRow row) { row.isNullAt(13) ? null : row.getLong(13), row.isNullAt(14) ? null : row.getBinary(14), row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15)), - row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16))); + row.isNullAt(16) ? null : fromStringArrayData(row.getArray(16)), + row.isNullAt(17) ? null : row.getString(17).toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java index 3e351cd1dae9..5da96da765fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -155,6 +155,7 @@ public DataFileMeta fromRow(InternalRow row) { null, null, null, + null, null); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 9fc251c36672..c65f8302aa87 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -47,11 +48,12 @@ /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 5; + private static final int CURRENT_VERSION = 6; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; + private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer; private DataFileMeta09Serializer dataFile09Serializer; private DataFileMeta08Serializer dataFile08Serializer; private IndexFileMeta09Serializer indexEntry09Serializer; @@ -129,8 +131,13 @@ private CommitMessage deserialize(int version, DataInputView view) throws IOExce private IOExceptionSupplier> fileDeserializer( int version, DataInputView view) { - if (version >= 4) { + if (version >= 5) { return () -> dataFileSerializer.deserializeList(view); + } else if (version == 4) { + if (dataFileMeta10LegacySerializer == null) { + dataFileMeta10LegacySerializer = new DataFileMeta10LegacySerializer(); + } + return () -> dataFileMeta10LegacySerializer.deserializeList(view); } else if (version == 3) { if (dataFile09Serializer == null) { dataFile09Serializer = new DataFileMeta09Serializer(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index b9460f28b4e7..40673ee78826 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -22,6 +22,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMeta09Serializer; +import org.apache.paimon.io.DataFileMeta10LegacySerializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataInputViewStreamWrapper; @@ -51,7 +52,7 @@ public class DataSplit implements Split { private static final long serialVersionUID = 7L; private static final long MAGIC = -2394839472490812314L; - private static final int VERSION = 4; + private static final int VERSION = 5; private long snapshotId = 0; private BinaryRow partition; @@ -362,7 +363,10 @@ private static FunctionWithIOException getFileMetaS } else if (version == 2) { DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); return serializer::deserialize; - } else if (version >= 3) { + } else if (version == 3) { + DataFileMeta10LegacySerializer serializer = new DataFileMeta10LegacySerializer(); + return serializer::deserialize; + } else if (version >= 4) { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); return serializer::deserialize; } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index be4147735614..27fa311ddb0a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -160,6 +160,7 @@ private static DataFileMeta newFile(long timeMillis) { 0L, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index 48c8d44876ae..a44ef9a53085 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -57,6 +57,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { maxSeq - minSeq + 1, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index fbc02b2d73f2..34af55165954 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -75,7 +75,8 @@ public void testProduction() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file"); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); @@ -106,6 +107,76 @@ public void testProduction() throws IOException { assertThat(deserialized).isEqualTo(manifestCommittable); } + @Test + public void testCompatibilityToVersion5() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file"); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + byte[] v2Bytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v5"), + true); + deserialized = serializer.deserialize(2, v2Bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + @Test public void testCompatibilityToVersion4() throws IOException { SimpleStats keyStats = @@ -136,7 +207,8 @@ public void testCompatibilityToVersion4() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + null); List dataFiles = Collections.singletonList(dataFile); LinkedHashMap dvMetas = new LinkedHashMap<>(); @@ -206,6 +278,7 @@ public void testCompatibilityToVersion3() throws IOException { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -276,6 +349,7 @@ public void testCompatibilityToVersion2() throws IOException { 11L, new byte[] {1, 2, 4}, null, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -346,6 +420,7 @@ public void testCompatibilityToVersion2PaimonV07() throws IOException { null, null, null, + null, null); List dataFiles = Collections.singletonList(dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 52d82e76be2a..19bd6a856bf9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -95,6 +95,7 @@ protected ManifestEntry makeEntry( 0L, // not used embeddedIndex, // not used FileSource.APPEND, + null, null)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index bdee5c5f7507..94c11498c5db 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -184,6 +184,7 @@ private DataFileMeta makeInterval(int left, int right) { 0L, null, FileSource.APPEND, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 9dc98343734b..abff820b2cb4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -214,6 +214,7 @@ public void testExpireExtraFiles() throws IOException { 0L, null, FileSource.APPEND, + null, null); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index 0219941a0ac0..88394d2dc33b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -139,7 +139,8 @@ public void testSerializerNormal() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + "hdfs:///path/to/warehouse"); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 33L); @@ -194,6 +195,7 @@ public void testSerializerCompatibleV1() throws Exception { 11L, new byte[] {1, 2, 4}, null, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -254,6 +256,7 @@ public void testSerializerCompatibleV2() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, + null, null); List dataFiles = Collections.singletonList(dataFile); @@ -314,7 +317,8 @@ public void testSerializerCompatibleV3() throws Exception { 11L, new byte[] {1, 2, 4}, FileSource.COMPACT, - Arrays.asList("field1", "field2", "field3")); + Arrays.asList("field1", "field2", "field3"), + null); List dataFiles = Collections.singletonList(dataFile); DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); @@ -347,6 +351,71 @@ public void testSerializerCompatibleV3() throws Exception { assertThat(actual).isEqualTo(split); } + @Test + public void testSerializerCompatibleV4() throws Exception { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs:///path/to/warehouse"); + List dataFiles = Collections.singletonList(dataFile); + + DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, null); + List deletionFiles = Collections.singletonList(deletionFile); + + BinaryRow partition = new BinaryRow(1); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition); + binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa")); + binaryRowWriter.complete(); + + DataSplit split = + DataSplit.builder() + .withSnapshot(18) + .withPartition(partition) + .withBucket(20) + .withDataFiles(dataFiles) + .withDataDeletionFiles(deletionFiles) + .withBucketPath("my path") + .build(); + + byte[] v2Bytes = + IOUtils.readFully( + SplitTest.class + .getClassLoader() + .getResourceAsStream("compatibility/datasplit-v4"), + true); + + DataSplit actual = + InstantiationUtil.deserializeObject(v2Bytes, DataSplit.class.getClassLoader()); + assertThat(actual).isEqualTo(split); + } + private DataFileMeta newDataFile(long rowCount) { return DataFileMeta.forAppend( "my_data_file.parquet", diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v4 b/paimon-core/src/test/resources/compatibility/datasplit-v4 new file mode 100644 index 000000000000..6ccef002b15d Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/datasplit-v4 differ diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v5 b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 new file mode 100644 index 000000000000..8b2b05869bcf Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/manifest-committable-v5 differ