diff --git a/docs/content/concepts/spec/manifest.md b/docs/content/concepts/spec/manifest.md index bc7318331a51..8460febf7865 100644 --- a/docs/content/concepts/spec/manifest.md +++ b/docs/content/concepts/spec/manifest.md @@ -111,5 +111,9 @@ The index file meta is: 2. fileName: file name. 3. fileSize: file size. 4. rowCount: total number of rows. -5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", Stores offset and length of each data file, - The schema is `ARRAY>`. +5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", is an array of deletion vector meta, the schema of each deletion vector meta is: + 1. f0: the data file name corresponding to this deletion vector. + 2. f1: the starting offset of this deletion vector in the index file. + 3. f2: the length of this deletion vector in the index file. + 4. cardinality: the number of deleted rows. + diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index a2c592596646..51ae729c2193 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -117,4 +117,9 @@ public boolean equals(Object o) { BitmapDeletionVector that = (BitmapDeletionVector) o; return Objects.equals(this.roaringBitmap, that.roaringBitmap); } + + @Override + public int hashCode() { + return Objects.hashCode(roaringBitmap); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java index f8c8330f190c..5246d35d4b31 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java @@ -20,9 +20,9 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.options.MemorySize; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; import org.apache.paimon.utils.Preconditions; @@ -104,13 +104,13 @@ private class SingleIndexFileWriter implements Closeable { private final Path path; private final DataOutputStream dataOutputStream; - private final LinkedHashMap> dvRanges; + private final LinkedHashMap dvMetas; private SingleIndexFileWriter() throws IOException { this.path = indexPathFactory.newPath(); this.dataOutputStream = new DataOutputStream(fileIO.newOutputStream(path, true)); dataOutputStream.writeByte(VERSION_ID_V1); - this.dvRanges = new LinkedHashMap<>(); + this.dvMetas = new LinkedHashMap<>(); } private long writtenSizeInBytes() { @@ -121,7 +121,10 @@ private void write(String key, DeletionVector deletionVector) throws IOException Preconditions.checkNotNull(dataOutputStream); byte[] data = deletionVector.serializeToBytes(); int size = data.length; - dvRanges.put(key, Pair.of(dataOutputStream.size(), size)); + dvMetas.put( + key, + new DeletionVectorMeta( + key, dataOutputStream.size(), size, deletionVector.getCardinality())); dataOutputStream.writeInt(size); dataOutputStream.write(data); dataOutputStream.writeInt(calculateChecksum(data)); @@ -132,8 +135,8 @@ public IndexFileMeta writtenIndexFile() { DELETION_VECTORS_INDEX, path.getName(), writtenSizeInBytes(), - dvRanges.size(), - dvRanges); + dvMetas.size(), + dvMetas); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index 798404e001e5..77abb2d72985 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -21,11 +21,11 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFile; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.options.MemorySize; import org.apache.paimon.table.source.DeletionFile; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; import java.io.DataInputStream; @@ -63,9 +63,9 @@ public DeletionVectorsIndexFile( * @throws UncheckedIOException If an I/O error occurs while reading from the file. */ public Map readAllDeletionVectors(IndexFileMeta fileMeta) { - LinkedHashMap> deletionVectorRanges = - fileMeta.deletionVectorsRanges(); - checkNotNull(deletionVectorRanges); + LinkedHashMap deletionVectorMetas = + fileMeta.deletionVectorMetas(); + checkNotNull(deletionVectorMetas); String indexFileName = fileMeta.fileName(); Map deletionVectors = new HashMap<>(); @@ -73,18 +73,17 @@ public Map readAllDeletionVectors(IndexFileMeta fileMeta try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) { checkVersion(inputStream); DataInputStream dataInputStream = new DataInputStream(inputStream); - for (Map.Entry> entry : - deletionVectorRanges.entrySet()) { + for (DeletionVectorMeta deletionVectorMeta : deletionVectorMetas.values()) { deletionVectors.put( - entry.getKey(), - readDeletionVector(dataInputStream, entry.getValue().getRight())); + deletionVectorMeta.dataFileName(), + readDeletionVector(dataInputStream, deletionVectorMeta.length())); } } catch (Exception e) { throw new RuntimeException( "Unable to read deletion vectors from file: " + filePath - + ", deletionVectorRanges: " - + deletionVectorRanges, + + ", deletionVectorMetas: " + + deletionVectorMetas, e); } return deletionVectors; diff --git a/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java new file mode 100644 index 000000000000..9eb38818f694 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.Objects; + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** Metadata of deletion vector. */ +public class DeletionVectorMeta { + + public static final RowType SCHEMA = + RowType.of( + new DataField(0, "f0", newStringType(false)), + new DataField(1, "f1", new IntType(false)), + new DataField(2, "f2", new IntType(false)), + new DataField(3, "_CARDINALITY", new BigIntType(true))); + + private final String dataFileName; + private final int offset; + private final int length; + @Nullable private final Long cardinality; + + public DeletionVectorMeta( + String dataFileName, int start, int size, @Nullable Long cardinality) { + this.dataFileName = dataFileName; + this.offset = start; + this.length = size; + this.cardinality = cardinality; + } + + public String dataFileName() { + return dataFileName; + } + + public int offset() { + return offset; + } + + public int length() { + return length; + } + + @Nullable + public Long cardinality() { + return cardinality; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + DeletionVectorMeta that = (DeletionVectorMeta) o; + return offset == that.offset + && length == that.length + && Objects.equals(dataFileName, that.dataFileName) + && Objects.equals(cardinality, that.cardinality); + } + + @Override + public int hashCode() { + return Objects.hash(dataFileName, offset, length, cardinality); + } + + @Override + public String toString() { + return "DeletionVectorMeta{" + + "dataFileName='" + + dataFileName + + '\'' + + ", offset=" + + offset + + ", length=" + + length + + ", cardinality=" + + cardinality + + '}'; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 7e5efccdd813..8b0e5c5021f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -100,15 +100,16 @@ public Map scanDVIndex( if (meta.indexType().equals(DELETION_VECTORS_INDEX) && file.partition().equals(partition) && file.bucket() == bucket) { - LinkedHashMap> dvRanges = - meta.deletionVectorsRanges(); - checkNotNull(dvRanges); - for (String dataFile : dvRanges.keySet()) { - Pair pair = dvRanges.get(dataFile); - DeletionFile deletionFile = + LinkedHashMap dvMetas = meta.deletionVectorMetas(); + checkNotNull(dvMetas); + for (DeletionVectorMeta dvMeta : dvMetas.values()) { + result.put( + dvMeta.dataFileName(), new DeletionFile( - filePath(meta).toString(), pair.getLeft(), pair.getRight()); - result.put(dataFile, deletionFile); + filePath(meta).toString(), + dvMeta.offset(), + dvMeta.length(), + dvMeta.cardinality())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java index 24ba6992a5d9..aae4f8c4731b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java @@ -23,9 +23,7 @@ import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Pair; import javax.annotation.Nullable; @@ -54,12 +52,7 @@ public class IndexFileMeta { new DataField( 4, "_DELETIONS_VECTORS_RANGES", - new ArrayType( - true, - RowType.of( - newStringType(false), - new IntType(false), - new IntType(false)))))); + new ArrayType(true, DeletionVectorMeta.SCHEMA)))); private final String indexType; private final String fileName; @@ -68,9 +61,9 @@ public class IndexFileMeta { /** * Metadata only used by {@link DeletionVectorsIndexFile}, use LinkedHashMap to ensure that the - * order of DeletionVectorRanges and the written DeletionVectors is consistent. + * order of DeletionVectorMetas and the written DeletionVectors is consistent. */ - private final @Nullable LinkedHashMap> deletionVectorsRanges; + private final @Nullable LinkedHashMap deletionVectorMetas; public IndexFileMeta(String indexType, String fileName, long fileSize, long rowCount) { this(indexType, fileName, fileSize, rowCount, null); @@ -81,12 +74,12 @@ public IndexFileMeta( String fileName, long fileSize, long rowCount, - @Nullable LinkedHashMap> deletionVectorsRanges) { + @Nullable LinkedHashMap deletionVectorMetas) { this.indexType = indexType; this.fileName = fileName; this.fileSize = fileSize; this.rowCount = rowCount; - this.deletionVectorsRanges = deletionVectorsRanges; + this.deletionVectorMetas = deletionVectorMetas; } public String indexType() { @@ -105,8 +98,8 @@ public long rowCount() { return rowCount; } - public @Nullable LinkedHashMap> deletionVectorsRanges() { - return deletionVectorsRanges; + public @Nullable LinkedHashMap deletionVectorMetas() { + return deletionVectorMetas; } @Override @@ -122,12 +115,12 @@ public boolean equals(Object o) { && Objects.equals(fileName, that.fileName) && fileSize == that.fileSize && rowCount == that.rowCount - && Objects.equals(deletionVectorsRanges, that.deletionVectorsRanges); + && Objects.equals(deletionVectorMetas, that.deletionVectorMetas); } @Override public int hashCode() { - return Objects.hash(indexType, fileName, fileSize, rowCount, deletionVectorsRanges); + return Objects.hash(indexType, fileName, fileSize, rowCount, deletionVectorMetas); } @Override @@ -142,8 +135,8 @@ public String toString() { + fileSize + ", rowCount=" + rowCount - + ", deletionVectorsRanges=" - + deletionVectorsRanges + + ", deletionVectorMetas=" + + deletionVectorMetas + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java new file mode 100644 index 000000000000..915d904569d7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.data.serializer.InternalSerializers; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** Serializer for {@link IndexFileMeta} with 0.9 version. */ +public class IndexFileMeta09Serializer implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final RowType SCHEMA = + new RowType( + false, + Arrays.asList( + new DataField(0, "_INDEX_TYPE", newStringType(false)), + new DataField(1, "_FILE_NAME", newStringType(false)), + new DataField(2, "_FILE_SIZE", new BigIntType(false)), + new DataField(3, "_ROW_COUNT", new BigIntType(false)), + new DataField( + 4, + "_DELETIONS_VECTORS_RANGES", + new ArrayType( + true, + RowType.of( + newStringType(false), + new IntType(false), + new IntType(false)))))); + + protected final InternalRowSerializer rowSerializer; + + public IndexFileMeta09Serializer() { + this.rowSerializer = InternalSerializers.create(SCHEMA); + } + + public IndexFileMeta fromRow(InternalRow row) { + return new IndexFileMeta( + row.getString(0).toString(), + row.getString(1).toString(), + row.getLong(2), + row.getLong(3), + row.isNullAt(4) ? null : rowArrayDataToDvMetas(row.getArray(4))); + } + + public final List deserializeList(DataInputView source) throws IOException { + int size = source.readInt(); + List records = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + records.add(deserialize(source)); + } + return records; + } + + public IndexFileMeta deserialize(DataInputView in) throws IOException { + return fromRow(rowSerializer.deserialize(in)); + } + + public static LinkedHashMap rowArrayDataToDvMetas( + InternalArray arrayData) { + LinkedHashMap dvMetas = new LinkedHashMap<>(arrayData.size()); + for (int i = 0; i < arrayData.size(); i++) { + InternalRow row = arrayData.getRow(i, 3); + dvMetas.put( + row.getString(0).toString(), + new DeletionVectorMeta( + row.getString(0).toString(), row.getInt(1), row.getInt(2), null)); + } + return dvMetas; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java index 4b52932623f2..db4a44838fbf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java @@ -24,9 +24,9 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalRow; import org.apache.paimon.utils.ObjectSerializer; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.VersionedObjectSerializer; +import java.util.Collection; import java.util.LinkedHashMap; /** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */ @@ -43,9 +43,9 @@ public InternalRow toRow(IndexFileMeta record) { BinaryString.fromString(record.fileName()), record.fileSize(), record.rowCount(), - record.deletionVectorsRanges() == null + record.deletionVectorMetas() == null ? null - : dvRangesToRowArrayData(record.deletionVectorsRanges())); + : dvMetasToRowArrayData(record.deletionVectorMetas().values())); } @Override @@ -55,30 +55,35 @@ public IndexFileMeta fromRow(InternalRow row) { row.getString(1).toString(), row.getLong(2), row.getLong(3), - row.isNullAt(4) ? null : rowArrayDataToDvRanges(row.getArray(4))); + row.isNullAt(4) ? null : rowArrayDataToDvMetas(row.getArray(4))); } - public static InternalArray dvRangesToRowArrayData( - LinkedHashMap> dvRanges) { + public static InternalArray dvMetasToRowArrayData(Collection dvMetas) { return new GenericArray( - dvRanges.entrySet().stream() + dvMetas.stream() .map( - entry -> + dvMeta -> GenericRow.of( - BinaryString.fromString(entry.getKey()), - entry.getValue().getLeft(), - entry.getValue().getRight())) + BinaryString.fromString(dvMeta.dataFileName()), + dvMeta.offset(), + dvMeta.length(), + dvMeta.cardinality())) .toArray(GenericRow[]::new)); } - public static LinkedHashMap> rowArrayDataToDvRanges( + public static LinkedHashMap rowArrayDataToDvMetas( InternalArray arrayData) { - LinkedHashMap> dvRanges = - new LinkedHashMap<>(arrayData.size()); + LinkedHashMap dvMetas = new LinkedHashMap<>(arrayData.size()); for (int i = 0; i < arrayData.size(); i++) { - InternalRow row = arrayData.getRow(i, 3); - dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), row.getInt(2))); + InternalRow row = arrayData.getRow(i, DeletionVectorMeta.SCHEMA.getFieldCount()); + dvMetas.put( + row.getString(0).toString(), + new DeletionVectorMeta( + row.getString(0).toString(), + row.getInt(1), + row.getInt(2), + row.isNullAt(3) ? null : row.getLong(3))); } - return dvRanges; + return dvMetas; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java index a52d9e8af40f..2431a1c26412 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; @@ -57,12 +58,7 @@ public class IndexManifestEntry { new DataField( 7, "_DELETIONS_VECTORS_RANGES", - new ArrayType( - true, - RowType.of( - newStringType(false), - new IntType(false), - new IntType(false)))))); + new ArrayType(true, DeletionVectorMeta.SCHEMA)))); private final FileKind kind; private final BinaryRow partition; diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java index 574e935550eb..0cb755b051b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java @@ -22,16 +22,17 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.utils.VersionedObjectSerializer; -import static org.apache.paimon.index.IndexFileMetaSerializer.dvRangesToRowArrayData; -import static org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvRanges; import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; /** A {@link VersionedObjectSerializer} for {@link IndexManifestEntry}. */ public class IndexManifestEntrySerializer extends VersionedObjectSerializer { + private static final long serialVersionUID = 2L; + public IndexManifestEntrySerializer() { super(IndexManifestEntry.SCHEMA); } @@ -52,9 +53,10 @@ public InternalRow convertTo(IndexManifestEntry record) { BinaryString.fromString(indexFile.fileName()), indexFile.fileSize(), indexFile.rowCount(), - record.indexFile().deletionVectorsRanges() == null + record.indexFile().deletionVectorMetas() == null ? null - : dvRangesToRowArrayData(record.indexFile().deletionVectorsRanges())); + : IndexFileMetaSerializer.dvMetasToRowArrayData( + record.indexFile().deletionVectorMetas().values())); } @Override @@ -72,6 +74,8 @@ public IndexManifestEntry convertFrom(int version, InternalRow row) { row.getString(4).toString(), row.getLong(5), row.getLong(6), - row.isNullAt(7) ? null : rowArrayDataToDvRanges(row.getArray(7)))); + row.isNullAt(7) + ? null + : IndexFileMetaSerializer.rowArrayDataToDvMetas(row.getArray(7)))); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 7918914b2c63..800208beda78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -19,6 +19,8 @@ package org.apache.paimon.table.sink; import org.apache.paimon.data.serializer.VersionedSerializer; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexFileMeta09Serializer; import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; @@ -50,7 +52,9 @@ public class CommitMessageSerializer implements VersionedSerializer deserializeList(int version, DataInputView view) thro } private CommitMessage deserialize(int version, DataInputView view) throws IOException { - if (version >= 3) { - IOExceptionSupplier> fileDeserializer = - () -> dataFileSerializer.deserializeList(view); - if (version == 3) { - DataFileMeta09Serializer serializer = new DataFileMeta09Serializer(); - fileDeserializer = () -> serializer.deserializeList(view); - } - return new CommitMessageImpl( - deserializeBinaryRow(view), - view.readInt(), - new DataIncrement( - fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), - new CompactIncrement( - fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), - new IndexIncrement( - indexEntrySerializer.deserializeList(view), - indexEntrySerializer.deserializeList(view))); - } else { - return deserialize08(version, view); - } - } - - private CommitMessage deserialize08(int version, DataInputView view) throws IOException { - if (dataFile08Serializer == null) { - dataFile08Serializer = new DataFileMeta08Serializer(); - } + IOExceptionSupplier> fileDeserializer = fileDeserializer(version, view); + IOExceptionSupplier> indexEntryDeserializer = + indexEntryDeserializer(version, view); return new CommitMessageImpl( deserializeBinaryRow(view), view.readInt(), new DataIncrement( - dataFile08Serializer.deserializeList(view), - dataFile08Serializer.deserializeList(view), - dataFile08Serializer.deserializeList(view)), + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), new CompactIncrement( - dataFile08Serializer.deserializeList(view), - dataFile08Serializer.deserializeList(view), - dataFile08Serializer.deserializeList(view)), + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), new IndexIncrement( - indexEntrySerializer.deserializeList(view), - version <= 2 - ? Collections.emptyList() - : indexEntrySerializer.deserializeList(view))); + indexEntryDeserializer.get(), + version <= 2 ? Collections.emptyList() : indexEntryDeserializer.get())); + } + + private IOExceptionSupplier> fileDeserializer( + int version, DataInputView view) { + if (version > 3) { + return () -> dataFileSerializer.deserializeList(view); + } else if (version == 3) { + if (dataFile09Serializer == null) { + dataFile09Serializer = new DataFileMeta09Serializer(); + } + return () -> dataFile09Serializer.deserializeList(view); + } else { + if (dataFile08Serializer == null) { + dataFile08Serializer = new DataFileMeta08Serializer(); + } + return () -> dataFile08Serializer.deserializeList(view); + } + } + + private IOExceptionSupplier> indexEntryDeserializer( + int version, DataInputView view) { + if (version > 3) { + return () -> indexEntrySerializer.deserializeList(view); + } else { + if (indexEntry09Serializer == null) { + indexEntry09Serializer = new IndexFileMeta09Serializer(); + } + return () -> indexEntry09Serializer.deserializeList(view); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java index 94dfc615729c..8333f33cc1d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java @@ -47,16 +47,18 @@ @Public public class DeletionFile implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final String path; private final long offset; private final long length; + @Nullable private final Long cardinality; - public DeletionFile(String path, long offset, long length) { + public DeletionFile(String path, long offset, long length, @Nullable Long cardinality) { this.path = path; this.offset = offset; this.length = length; + this.cardinality = cardinality; } /** Path of the file. */ @@ -74,6 +76,12 @@ public long length() { return length; } + /** the number of deleted rows. */ + @Nullable + public Long cardinality() { + return cardinality; + } + public static void serialize(DataOutputView out, @Nullable DeletionFile file) throws IOException { if (file == null) { @@ -83,6 +91,7 @@ public static void serialize(DataOutputView out, @Nullable DeletionFile file) out.writeUTF(file.path); out.writeLong(file.offset); out.writeLong(file.length); + out.writeLong(file.cardinality == null ? -1 : file.cardinality); } } @@ -108,7 +117,8 @@ public static DeletionFile deserialize(DataInputView in) throws IOException { String path = in.readUTF(); long offset = in.readLong(); long length = in.readLong(); - return new DeletionFile(path, offset, length); + long cardinality = in.readLong(); + return new DeletionFile(path, offset, length, cardinality == -1 ? null : cardinality); } @Nullable @@ -126,22 +136,34 @@ public static List deserializeList(DataInputView in) throws IOExce @Override public boolean equals(Object o) { - if (!(o instanceof DeletionFile)) { + if (o == null || getClass() != o.getClass()) { return false; } - - DeletionFile other = (DeletionFile) o; - return Objects.equals(path, other.path) && offset == other.offset && length == other.length; + DeletionFile that = (DeletionFile) o; + return offset == that.offset + && length == that.length + && Objects.equals(path, that.path) + && Objects.equals(cardinality, that.cardinality); } @Override public int hashCode() { - return Objects.hash(path, offset, length); + return Objects.hash(path, offset, length, cardinality); } @Override public String toString() { - return String.format("{path = %s, offset = %d, length = %d}", path, offset, length); + return "DeletionFile{" + + "path='" + + path + + '\'' + + ", offset=" + + offset + + ", length=" + + length + + ", cardinality=" + + cardinality + + '}'; } static Factory emptyFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index ce01bdba9447..bf19ba10c689 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -24,6 +24,7 @@ import org.apache.paimon.codegen.RecordComparator; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; @@ -492,23 +493,24 @@ private List getDeletionFiles( List deletionFiles = new ArrayList<>(dataFiles.size()); Map dataFileToIndexFileMeta = new HashMap<>(); for (IndexFileMeta indexFileMeta : indexFileMetas) { - if (indexFileMeta.deletionVectorsRanges() != null) { - for (String dataFileName : indexFileMeta.deletionVectorsRanges().keySet()) { - dataFileToIndexFileMeta.put(dataFileName, indexFileMeta); + if (indexFileMeta.deletionVectorMetas() != null) { + for (DeletionVectorMeta dvMeta : indexFileMeta.deletionVectorMetas().values()) { + dataFileToIndexFileMeta.put(dvMeta.dataFileName(), indexFileMeta); } } } for (DataFileMeta file : dataFiles) { IndexFileMeta indexFileMeta = dataFileToIndexFileMeta.get(file.fileName()); if (indexFileMeta != null) { - Map> ranges = indexFileMeta.deletionVectorsRanges(); - if (ranges != null && ranges.containsKey(file.fileName())) { - Pair range = ranges.get(file.fileName()); + LinkedHashMap dvMetas = + indexFileMeta.deletionVectorMetas(); + if (dvMetas != null && dvMetas.containsKey(file.fileName())) { deletionFiles.add( new DeletionFile( indexFileHandler.filePath(indexFileMeta).toString(), - range.getKey(), - range.getValue())); + dvMetas.get(file.fileName()).offset(), + dvMetas.get(file.fileName()).length(), + dvMetas.get(file.fileName()).cardinality())); continue; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java index 6c674352b8d3..a52819c80515 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java @@ -23,12 +23,12 @@ import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DeletionFile; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PathFactory; import org.junit.jupiter.api.Test; @@ -94,7 +94,7 @@ public void test() throws Exception { assertThat(res.size()).isEqualTo(3); IndexManifestEntry entry = res.stream().filter(file -> file.kind() == FileKind.ADD).findAny().get(); - assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue(); + assertThat(entry.indexFile().deletionVectorMetas().containsKey("f2")).isTrue(); entry = res.stream() .filter(file -> file.kind() == FileKind.DELETE) @@ -117,14 +117,15 @@ private Map createDeletionFileMapFromIndexFileMetas( PathFactory indexPathFactory, List fileMetas) { Map dataFileToDeletionFiles = new HashMap<>(); for (IndexFileMeta indexFileMeta : fileMetas) { - for (Map.Entry> range : - indexFileMeta.deletionVectorsRanges().entrySet()) { + for (Map.Entry dvMeta : + indexFileMeta.deletionVectorMetas().entrySet()) { dataFileToDeletionFiles.put( - range.getKey(), + dvMeta.getKey(), new DeletionFile( indexPathFactory.toPath(indexFileMeta.fileName()).toString(), - range.getValue().getLeft(), - range.getValue().getRight())); + dvMeta.getValue().offset(), + dvMeta.getValue().length(), + dvMeta.getValue().cardinality())); } } return dataFileToDeletionFiles; diff --git a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java index 724d5b416359..a7e692d2e554 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java @@ -21,7 +21,6 @@ import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile; import org.apache.paimon.utils.ObjectSerializer; import org.apache.paimon.utils.ObjectSerializerTestBase; -import org.apache.paimon.utils.Pair; import java.util.LinkedHashMap; import java.util.Random; @@ -59,14 +58,20 @@ public static IndexFileMeta randomHashIndexFile() { public static IndexFileMeta randomDeletionVectorIndexFile() { Random rnd = new Random(); - LinkedHashMap> deletionVectorsRanges = new LinkedHashMap<>(); - deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), rnd.nextInt())); - deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), rnd.nextInt())); + LinkedHashMap deletionVectorMetas = new LinkedHashMap<>(); + deletionVectorMetas.put( + "my_file_name1", + new DeletionVectorMeta( + "my_file_name1", rnd.nextInt(), rnd.nextInt(), rnd.nextLong())); + deletionVectorMetas.put( + "my_file_name2", + new DeletionVectorMeta( + "my_file_name2", rnd.nextInt(), rnd.nextInt(), rnd.nextLong())); return new IndexFileMeta( DeletionVectorsIndexFile.DELETION_VECTORS_INDEX, "deletion_vectors_index_file_name" + rnd.nextLong(), rnd.nextInt(), rnd.nextInt(), - deletionVectorsRanges); + deletionVectorMetas); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index bd272b745dc4..0c312c4d4998 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.manifest; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.index.DeletionVectorMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; @@ -27,7 +28,6 @@ import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.Test; @@ -78,11 +78,11 @@ public void testProduction() throws IOException { Arrays.asList("field1", "field2", "field3")); List dataFiles = Collections.singletonList(dataFile); - LinkedHashMap> dvRanges = new LinkedHashMap<>(); - dvRanges.put("dv_key1", Pair.of(1, 2)); - dvRanges.put("dv_key2", Pair.of(3, 4)); + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); IndexFileMeta indexFile = - new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); CommitMessageImpl commitMessage = @@ -139,11 +139,11 @@ public void testCompatibilityToVersion3() throws IOException { null); List dataFiles = Collections.singletonList(dataFile); - LinkedHashMap> dvRanges = new LinkedHashMap<>(); - dvRanges.put("dv_key1", Pair.of(1, 2)); - dvRanges.put("dv_key2", Pair.of(3, 4)); + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); IndexFileMeta indexFile = - new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); CommitMessageImpl commitMessage = @@ -209,11 +209,11 @@ public void testCompatibilityToVersion2() throws IOException { null); List dataFiles = Collections.singletonList(dataFile); - LinkedHashMap> dvRanges = new LinkedHashMap<>(); - dvRanges.put("dv_key1", Pair.of(1, 2)); - dvRanges.put("dv_key2", Pair.of(3, 4)); + LinkedHashMap dvMetas = new LinkedHashMap<>(); + dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null)); + dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null)); IndexFileMeta indexFile = - new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvMetas); List indexFiles = Collections.singletonList(indexFile); CommitMessageImpl commitMessage = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java index eb9105189b71..a94f45b67378 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java @@ -48,7 +48,7 @@ public void test() throws IOException { CommitMessageImpl committable = new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement, indexIncrement); CommitMessageImpl newCommittable = - (CommitMessageImpl) serializer.deserialize(3, serializer.serialize(committable)); + (CommitMessageImpl) serializer.deserialize(4, serializer.serialize(committable)); assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement()); assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement()); assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement()); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index ea8309e14ffe..46a423b9d699 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -631,6 +631,27 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe ) } + test("Paimon deletionVector: get cardinality") { + sql(s""" + |CREATE TABLE T (id INT) + |TBLPROPERTIES ( + | 'deletion-vectors.enabled' = 'true', + | 'bucket-key' = 'id', + | 'bucket' = '1' + |) + |""".stripMargin) + + sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)") + sql("DELETE FROM T WHERE id >= 111 and id <= 444") + + val fileStore = loadTable("T").store() + val indexManifest = fileStore.snapshotManager().latestSnapshot().indexManifest() + val entry = fileStore.newIndexFileHandler().readManifest(indexManifest).get(0) + val dvMeta = entry.indexFile().deletionVectorMetas().values().iterator().next() + + assert(dvMeta.cardinality() == 334) + } + private def getPathName(path: String): String = { new Path(path).getName }