Skip to content

Commit

Permalink
[core] Use readFully in DeletionVectorsIndexFile and explicitly speci…
Browse files Browse the repository at this point in the history
…fy dvRanges' type as LinkedHashMap (apache#3283)
  • Loading branch information
Zouxxyy authored Apr 29, 2024
1 parent 251355c commit c796439
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ static DeletionVector read(FileIO fileIO, DeletionFile deletionFile) throws IOEx
"Size not match, actual size: "
+ actualLength
+ ", expert size: "
+ deletionFile.length());
+ deletionFile.length()
+ ", file path: "
+ path);
}
int magicNum = dis.readInt();
if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) {
* @throws UncheckedIOException If an I/O error occurs while reading from the file.
*/
public Map<String, DeletionVector> readAllDeletionVectors(
String fileName, Map<String, Pair<Integer, Integer>> deletionVectorRanges) {
String fileName, LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges) {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
try (SeekableInputStream inputStream =
fileIO.newInputStream(pathFactory.toPath(fileName))) {
Path filePath = pathFactory.toPath(fileName);
try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
for (Map.Entry<String, Pair<Integer, Integer>> entry :
Expand All @@ -69,9 +69,13 @@ public Map<String, DeletionVector> readAllDeletionVectors(
entry.getKey(),
readDeletionVector(dataInputStream, entry.getValue().getRight()));
}
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to read deletion vectors from file: " + fileName, e);
} catch (Exception e) {
throw new RuntimeException(
"Unable to read deletion vectors from file: "
+ filePath
+ ", deletionVectorRanges: "
+ deletionVectorRanges,
e);
}
return deletionVectors;
}
Expand All @@ -87,15 +91,19 @@ public Map<String, DeletionVector> readAllDeletionVectors(
*/
public DeletionVector readDeletionVector(
String fileName, Pair<Integer, Integer> deletionVectorRange) {
try (SeekableInputStream inputStream =
fileIO.newInputStream(pathFactory.toPath(fileName))) {
Path filePath = pathFactory.toPath(fileName);
try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
inputStream.seek(deletionVectorRange.getLeft());
DataInputStream dataInputStream = new DataInputStream(inputStream);
return readDeletionVector(dataInputStream, deletionVectorRange.getRight());
} catch (IOException e) {
throw new UncheckedIOException(
"Unable to read deletion vector from file: " + fileName, e);
} catch (Exception e) {
throw new RuntimeException(
"Unable to read deletion vector from file: "
+ filePath
+ ", deletionVectorRange: "
+ deletionVectorRange,
e);
}
}

Expand All @@ -111,12 +119,11 @@ public DeletionVector readDeletionVector(
* data is located.
* @throws UncheckedIOException If an I/O error occurs while writing to the file.
*/
public Pair<String, Map<String, Pair<Integer, Integer>>> write(
public Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> write(
Map<String, DeletionVector> input) {
int size = input.size();
// use LinkedHashMap to ensure that the order of DeletionVectorRanges and the written
// DeletionVectors is consistent.
Map<String, Pair<Integer, Integer>> deletionVectorRanges = new LinkedHashMap<>(size);
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
new LinkedHashMap<>(size);
Path path = pathFactory.newPath();
try (DataOutputStream dataOutputStream =
new DataOutputStream(fileIO.newOutputStream(path, true))) {
Expand Down Expand Up @@ -158,11 +165,7 @@ private DeletionVector readDeletionVector(DataInputStream inputStream, int size)

// read DeletionVector bytes
byte[] bytes = new byte[size];
int readSize = inputStream.read(bytes);
if (readSize != size) {
throw new RuntimeException(
"Size not match, actual size: " + readSize + ", expert size: " + size);
}
inputStream.readFully(bytes);

// check checksum
int checkSum = calculateChecksum(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -171,7 +172,8 @@ public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta fileMeta
throw new IllegalArgumentException(
"Input file is not deletion vectors index " + fileMeta.indexType());
}
Map<String, Pair<Integer, Integer>> deleteIndexRange = fileMeta.deletionVectorsRanges();
LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
fileMeta.deletionVectorsRanges();
if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
return Collections.emptyMap();
}
Expand All @@ -193,7 +195,7 @@ public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta, Strin
}

public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector> deletionVectors) {
Pair<String, Map<String, Pair<Integer, Integer>>> pair =
Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndex.write(deletionVectors);
return new IndexFileMeta(
DELETION_VECTORS_INDEX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.SerializationUtils.newStringType;
Expand All @@ -43,8 +43,11 @@ public class IndexFileMeta {
private final long fileSize;
private final long rowCount;

/** Metadata only used by {@link DeletionVectorsIndexFile}. */
private final @Nullable Map<String, Pair<Integer, Integer>> deletionVectorsRanges;
/**
* Metadata only used by {@link DeletionVectorsIndexFile}, use LinkedHashMap to ensure that the
* order of DeletionVectorRanges and the written DeletionVectors is consistent.
*/
private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges;

public IndexFileMeta(String indexType, String fileName, long fileSize, long rowCount) {
this(indexType, fileName, fileSize, rowCount, null);
Expand All @@ -55,7 +58,7 @@ public IndexFileMeta(
String fileName,
long fileSize,
long rowCount,
@Nullable Map<String, Pair<Integer, Integer>> deletionVectorsRanges) {
@Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges) {
this.indexType = indexType;
this.fileName = fileName;
this.fileSize = fileSize;
Expand All @@ -79,7 +82,7 @@ public long rowCount() {
return rowCount;
}

public @Nullable Map<String, Pair<Integer, Integer>> deletionVectorsRanges() {
public @Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges() {
return deletionVectorsRanges;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.utils.VersionedObjectSerializer;

import java.util.LinkedHashMap;
import java.util.Map;

/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
Expand Down Expand Up @@ -60,7 +59,7 @@ public IndexFileMeta fromRow(InternalRow row) {
}

public static InternalArray dvRangesToRowArrayData(
Map<String, Pair<Integer, Integer>> dvRanges) {
LinkedHashMap<String, Pair<Integer, Integer>> dvRanges) {
return new GenericArray(
dvRanges.entrySet().stream()
.map(
Expand All @@ -72,9 +71,10 @@ public static InternalArray dvRangesToRowArrayData(
.toArray(GenericRow[]::new));
}

public static Map<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
public static LinkedHashMap<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
InternalArray arrayData) {
Map<String, Pair<Integer, Integer>> dvRanges = new LinkedHashMap<>(arrayData.size());
LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
new LinkedHashMap<>(arrayData.size());
for (int i = 0; i < arrayData.size(); i++) {
InternalRow row = arrayData.getRow(i, 3);
dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), row.getInt(2)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import org.junit.jupiter.api.io.TempDir;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -37,20 +39,8 @@ public class DeletionVectorsIndexFileTest {
@TempDir java.nio.file.Path tempPath;

@Test
public void test0() {
Path dir = new Path(tempPath.toUri());
PathFactory pathFactory =
new PathFactory() {
@Override
public Path newPath() {
return new Path(dir, UUID.randomUUID().toString());
}

@Override
public Path toPath(String fileName) {
return new Path(dir, fileName);
}
};
public void testReadDvIndex() {
PathFactory pathFactory = getPathFactory();

DeletionVectorsIndexFile deletionVectorsIndexFile =
new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory);
Expand All @@ -70,10 +60,10 @@ public Path toPath(String fileName) {
index3.delete(3);
deleteMap.put("file33.parquet", index3);

Pair<String, Map<String, Pair<Integer, Integer>>> pair =
Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndexFile.write(deleteMap);
String fileName = pair.getLeft();
Map<String, Pair<Integer, Integer>> deletionVectorRanges = pair.getRight();
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges = pair.getRight();

// read
Map<String, DeletionVector> actualDeleteMap =
Expand All @@ -94,4 +84,67 @@ public Path toPath(String fileName) {
deletionVectorsIndexFile.delete(fileName);
assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
}

@Test
public void testReadDvIndexWithCopiousDv() {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory);

// write
Random random = new Random();
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
for (int i = 0; i < 100000; i++) {
BitmapDeletionVector index = new BitmapDeletionVector();
index.delete(random.nextInt(1000000));
deleteMap.put(String.format("file%s.parquet", i), index);
}

Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndexFile.write(deleteMap);

// read
Map<String, DeletionVector> dvs =
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight());
assertThat(dvs.size()).isEqualTo(100000);
}

@Test
public void testReadDvIndexWithEnormousDv() {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
new DeletionVectorsIndexFile(LocalFileIO.create(), pathFactory);

// write
Random random = new Random();
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
BitmapDeletionVector index = new BitmapDeletionVector();
// dv index's size is about 20M
for (int i = 0; i < 10000000; i++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
}
deleteMap.put("largeFile.parquet", index);
Pair<String, LinkedHashMap<String, Pair<Integer, Integer>>> pair =
deletionVectorsIndexFile.write(deleteMap);

// read
Map<String, DeletionVector> dvs =
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), pair.getRight());
assertThat(dvs.size()).isEqualTo(1);
}

private PathFactory getPathFactory() {
Path dir = new Path(tempPath.toUri());
return new PathFactory() {
@Override
public Path newPath() {
return new Path(dir, UUID.randomUUID().toString());
}

@Override
public Path toPath(String fileName) {
return new Path(dir, fileName);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.utils.Pair;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Random;

/** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
Expand All @@ -49,7 +48,8 @@ public static IndexFileMeta randomIndexFile() {
rnd.nextInt(),
rnd.nextInt());
} else {
Map<String, Pair<Integer, Integer>> deletionVectorsRanges = new LinkedHashMap<>();
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges =
new LinkedHashMap<>();
deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), rnd.nextInt()));
deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), rnd.nextInt()));
return new IndexFileMeta(
Expand Down

0 comments on commit c796439

Please sign in to comment.