diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java index be20d3891410..60d69a436409 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java @@ -115,7 +115,9 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx "Size not match, actual size: " + actualLength + ", expert size: " - + deletionFile.length()); + + deletionFile.length() + + ", file path: " + + path); } int magicNum = dis.readInt(); if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) { diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java index 313435928505..733fb7825440 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java @@ -57,10 +57,10 @@ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) { * @throws UncheckedIOException If an I/O error occurs while reading from the file. */ public Map readAllDeletionVectors( - String fileName, Map> deletionVectorRanges) { + String fileName, LinkedHashMap> deletionVectorRanges) { Map deletionVectors = new HashMap<>(); - try (SeekableInputStream inputStream = - fileIO.newInputStream(pathFactory.toPath(fileName))) { + Path filePath = pathFactory.toPath(fileName); + try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) { checkVersion(inputStream); DataInputStream dataInputStream = new DataInputStream(inputStream); for (Map.Entry> entry : @@ -69,9 +69,13 @@ public Map readAllDeletionVectors( entry.getKey(), readDeletionVector(dataInputStream, entry.getValue().getRight())); } - } catch (IOException e) { - throw new UncheckedIOException( - "Unable to read deletion vectors from file: " + fileName, e); + } catch (Exception e) { + throw new RuntimeException( + "Unable to read deletion vectors from file: " + + filePath + + ", deletionVectorRanges: " + + deletionVectorRanges, + e); } return deletionVectors; } @@ -87,15 +91,19 @@ public Map readAllDeletionVectors( */ public DeletionVector readDeletionVector( String fileName, Pair deletionVectorRange) { - try (SeekableInputStream inputStream = - fileIO.newInputStream(pathFactory.toPath(fileName))) { + Path filePath = pathFactory.toPath(fileName); + try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) { checkVersion(inputStream); inputStream.seek(deletionVectorRange.getLeft()); DataInputStream dataInputStream = new DataInputStream(inputStream); return readDeletionVector(dataInputStream, deletionVectorRange.getRight()); - } catch (IOException e) { - throw new UncheckedIOException( - "Unable to read deletion vector from file: " + fileName, e); + } catch (Exception e) { + throw new RuntimeException( + "Unable to read deletion vector from file: " + + filePath + + ", deletionVectorRange: " + + deletionVectorRange, + e); } } @@ -111,12 +119,11 @@ public DeletionVector readDeletionVector( * data is located. * @throws UncheckedIOException If an I/O error occurs while writing to the file. */ - public Pair>> write( + public Pair>> write( Map input) { int size = input.size(); - // use LinkedHashMap to ensure that the order of DeletionVectorRanges and the written - // DeletionVectors is consistent. - Map> deletionVectorRanges = new LinkedHashMap<>(size); + LinkedHashMap> deletionVectorRanges = + new LinkedHashMap<>(size); Path path = pathFactory.newPath(); try (DataOutputStream dataOutputStream = new DataOutputStream(fileIO.newOutputStream(path, true))) { @@ -158,11 +165,7 @@ private DeletionVector readDeletionVector(DataInputStream inputStream, int size) // read DeletionVector bytes byte[] bytes = new byte[size]; - int readSize = inputStream.read(bytes); - if (readSize != size) { - throw new RuntimeException( - "Size not match, actual size: " + readSize + ", expert size: " + size); - } + inputStream.readFully(bytes); // check checksum int checkSum = calculateChecksum(bytes); diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java index 97cdbf43e781..fdc2fe754ff8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java @@ -34,6 +34,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -171,7 +172,8 @@ public Map readAllDeletionVectors(IndexFileMeta fileMeta throw new IllegalArgumentException( "Input file is not deletion vectors index " + fileMeta.indexType()); } - Map> deleteIndexRange = fileMeta.deletionVectorsRanges(); + LinkedHashMap> deleteIndexRange = + fileMeta.deletionVectorsRanges(); if (deleteIndexRange == null || deleteIndexRange.isEmpty()) { return Collections.emptyMap(); } @@ -193,7 +195,7 @@ public Optional readDeletionVector(IndexFileMeta fileMeta, Strin } public IndexFileMeta writeDeletionVectorsIndex(Map deletionVectors) { - Pair>> pair = + Pair>> pair = deletionVectorsIndex.write(deletionVectors); return new IndexFileMeta( DELETION_VECTORS_INDEX, diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java index 1b3f405fe869..b6afd0c44cb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java @@ -29,8 +29,8 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import static org.apache.paimon.utils.SerializationUtils.newStringType; @@ -43,8 +43,11 @@ public class IndexFileMeta { private final long fileSize; private final long rowCount; - /** Metadata only used by {@link DeletionVectorsIndexFile}. */ - private final @Nullable Map> deletionVectorsRanges; + /** + * Metadata only used by {@link DeletionVectorsIndexFile}, use LinkedHashMap to ensure that the + * order of DeletionVectorRanges and the written DeletionVectors is consistent. + */ + private final @Nullable LinkedHashMap> deletionVectorsRanges; public IndexFileMeta(String indexType, String fileName, long fileSize, long rowCount) { this(indexType, fileName, fileSize, rowCount, null); @@ -55,7 +58,7 @@ public IndexFileMeta( String fileName, long fileSize, long rowCount, - @Nullable Map> deletionVectorsRanges) { + @Nullable LinkedHashMap> deletionVectorsRanges) { this.indexType = indexType; this.fileName = fileName; this.fileSize = fileSize; @@ -79,7 +82,7 @@ public long rowCount() { return rowCount; } - public @Nullable Map> deletionVectorsRanges() { + public @Nullable LinkedHashMap> deletionVectorsRanges() { return deletionVectorsRanges; } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java index 005868540e46..e9d82af46e8e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java @@ -28,7 +28,6 @@ import org.apache.paimon.utils.VersionedObjectSerializer; import java.util.LinkedHashMap; -import java.util.Map; /** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */ public class IndexFileMetaSerializer extends ObjectSerializer { @@ -60,7 +59,7 @@ public IndexFileMeta fromRow(InternalRow row) { } public static InternalArray dvRangesToRowArrayData( - Map> dvRanges) { + LinkedHashMap> dvRanges) { return new GenericArray( dvRanges.entrySet().stream() .map( @@ -72,9 +71,10 @@ public static InternalArray dvRangesToRowArrayData( .toArray(GenericRow[]::new)); } - public static Map> rowArrayDataToDvRanges( + public static LinkedHashMap> rowArrayDataToDvRanges( InternalArray arrayData) { - Map> dvRanges = new LinkedHashMap<>(arrayData.size()); + LinkedHashMap> dvRanges = + new LinkedHashMap<>(arrayData.size()); for (int i = 0; i < arrayData.size(); i++) { InternalRow row = arrayData.getRow(i, 3); dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), row.getInt(2))); diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java index 228508f33d7a..fbaa407d2b30 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java @@ -27,7 +27,9 @@ import org.junit.jupiter.api.io.TempDir; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.Random; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -37,20 +39,8 @@ public class DeletionVectorsIndexFileTest { @TempDir java.nio.file.Path tempPath; @Test - public void test0() { - Path dir = new Path(tempPath.toUri()); - PathFactory pathFactory = - new PathFactory() { - @Override - public Path newPath() { - return new Path(dir, UUID.randomUUID().toString()); - } - - @Override - public Path toPath(String fileName) { - return new Path(dir, fileName); - } - }; + public void testReadDvIndex() { + PathFactory pathFactory = getPathFactory(); DeletionVectorsIndexFile deletionVectorsIndexFile = new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory); @@ -70,10 +60,10 @@ public Path toPath(String fileName) { index3.delete(3); deleteMap.put("file33.parquet", index3); - Pair>> pair = + Pair>> pair = deletionVectorsIndexFile.write(deleteMap); String fileName = pair.getLeft(); - Map> deletionVectorRanges = pair.getRight(); + LinkedHashMap> deletionVectorRanges = pair.getRight(); // read Map actualDeleteMap = @@ -94,4 +84,67 @@ public Path toPath(String fileName) { deletionVectorsIndexFile.delete(fileName); assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse(); } + + @Test + public void testReadDvIndexWithCopiousDv() { + PathFactory pathFactory = getPathFactory(); + DeletionVectorsIndexFile deletionVectorsIndexFile = + new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory); + + // write + Random random = new Random(); + HashMap deleteMap = new HashMap<>(); + for (int i = 0; i < 100000; i++) { + BitmapDeletionVector index = new BitmapDeletionVector(); + index.delete(random.nextInt(1000000)); + deleteMap.put(String.format("file%s.parquet", i), index); + } + + Pair>> pair = + deletionVectorsIndexFile.write(deleteMap); + + // read + Map dvs = + deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight()); + assertThat(dvs.size()).isEqualTo(100000); + } + + @Test + public void testReadDvIndexWithEnormousDv() { + PathFactory pathFactory = getPathFactory(); + DeletionVectorsIndexFile deletionVectorsIndexFile = + new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory); + + // write + Random random = new Random(); + HashMap deleteMap = new HashMap<>(); + BitmapDeletionVector index = new BitmapDeletionVector(); + // dv index's size is about 20M + for (int i = 0; i < 10000000; i++) { + index.delete(random.nextInt(Integer.MAX_VALUE)); + } + deleteMap.put("largeFile.parquet", index); + Pair>> pair = + deletionVectorsIndexFile.write(deleteMap); + + // read + Map dvs = + deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight()); + assertThat(dvs.size()).isEqualTo(1); + } + + private PathFactory getPathFactory() { + Path dir = new Path(tempPath.toUri()); + return new PathFactory() { + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString()); + } + + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + }; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java index 0e8cfc4302ef..53058a05f93c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java @@ -24,7 +24,6 @@ import org.apache.paimon.utils.Pair; import java.util.LinkedHashMap; -import java.util.Map; import java.util.Random; /** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */ @@ -49,7 +48,8 @@ public static IndexFileMeta randomIndexFile() { rnd.nextInt(), rnd.nextInt()); } else { - Map> deletionVectorsRanges = new LinkedHashMap<>(); + LinkedHashMap> deletionVectorsRanges = + new LinkedHashMap<>(); deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), rnd.nextInt())); deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), rnd.nextInt())); return new IndexFileMeta(