Skip to content

Commit

Permalink
v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 13, 2024
1 parent ce1c839 commit e28f834
Show file tree
Hide file tree
Showing 21 changed files with 553 additions and 163 deletions.
8 changes: 6 additions & 2 deletions docs/content/concepts/spec/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,9 @@ The index file meta is:
2. fileName: file name.
3. fileSize: file size.
4. rowCount: total number of rows.
5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", Stores offset and length of each data file,
The schema is `ARRAY<ROW<f0: STRING, f1: INT, f2: INT>>`.
5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", is an array of deletion vector meta, the schema of each deletion vector meta is:
1. f0: the data file name corresponding to this deletion vector.
2. f1: the starting offset of this deletion vector in the index file.
3. f2: the length of this deletion vector in the index file.
4. cardinality: the number of deleted rows.

Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ public boolean equals(Object o) {
BitmapDeletionVector that = (BitmapDeletionVector) o;
return Objects.equals(this.roaringBitmap, that.roaringBitmap);
}

@Override
public int hashCode() {
return Objects.hashCode(roaringBitmap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -104,13 +104,13 @@ private class SingleIndexFileWriter implements Closeable {

private final Path path;
private final DataOutputStream dataOutputStream;
private final LinkedHashMap<String, Pair<Integer, Integer>> dvRanges;
private final LinkedHashMap<String, DeletionVectorMeta> dvMetas;

private SingleIndexFileWriter() throws IOException {
this.path = indexPathFactory.newPath();
this.dataOutputStream = new DataOutputStream(fileIO.newOutputStream(path, true));
dataOutputStream.writeByte(VERSION_ID_V1);
this.dvRanges = new LinkedHashMap<>();
this.dvMetas = new LinkedHashMap<>();
}

private long writtenSizeInBytes() {
Expand All @@ -121,7 +121,10 @@ private void write(String key, DeletionVector deletionVector) throws IOException
Preconditions.checkNotNull(dataOutputStream);
byte[] data = deletionVector.serializeToBytes();
int size = data.length;
dvRanges.put(key, Pair.of(dataOutputStream.size(), size));
dvMetas.put(
key,
new DeletionVectorMeta(
key, dataOutputStream.size(), size, deletionVector.getCardinality()));
dataOutputStream.writeInt(size);
dataOutputStream.write(data);
dataOutputStream.writeInt(calculateChecksum(data));
Expand All @@ -132,8 +135,8 @@ public IndexFileMeta writtenIndexFile() {
DELETION_VECTORS_INDEX,
path.getName(),
writtenSizeInBytes(),
dvRanges.size(),
dvRanges);
dvMetas.size(),
dvMetas);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFile;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;

import java.io.DataInputStream;
Expand Down Expand Up @@ -63,28 +63,27 @@ public DeletionVectorsIndexFile(
* @throws UncheckedIOException If an I/O error occurs while reading from the file.
*/
public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta fileMeta) {
LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
fileMeta.deletionVectorsRanges();
checkNotNull(deletionVectorRanges);
LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas =
fileMeta.deletionVectorMetas();
checkNotNull(deletionVectorMetas);

String indexFileName = fileMeta.fileName();
Map<String, DeletionVector> deletionVectors = new HashMap<>();
Path filePath = pathFactory.toPath(indexFileName);
try (SeekableInputStream inputStream = fileIO.newInputStream(filePath)) {
checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
for (Map.Entry<String, Pair<Integer, Integer>> entry :
deletionVectorRanges.entrySet()) {
for (DeletionVectorMeta deletionVectorMeta : deletionVectorMetas.values()) {
deletionVectors.put(
entry.getKey(),
readDeletionVector(dataInputStream, entry.getValue().getRight()));
deletionVectorMeta.dataFileName(),
readDeletionVector(dataInputStream, deletionVectorMeta.length()));
}
} catch (Exception e) {
throw new RuntimeException(
"Unable to read deletion vectors from file: "
+ filePath
+ ", deletionVectorRanges: "
+ deletionVectorRanges,
+ ", deletionVectorMetas: "
+ deletionVectorMetas,
e);
}
return deletionVectors;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index;

import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.Objects;

import static org.apache.paimon.utils.SerializationUtils.newStringType;

/** Metadata of deletion vector. */
public class DeletionVectorMeta {

public static final RowType SCHEMA =
RowType.of(
new DataField(0, "f0", newStringType(false)),
new DataField(1, "f1", new IntType(false)),
new DataField(2, "f2", new IntType(false)),
new DataField(3, "_CARDINALITY", new BigIntType(true)));

private final String dataFileName;
private final int offset;
private final int length;
@Nullable private final Long cardinality;

public DeletionVectorMeta(
String dataFileName, int start, int size, @Nullable Long cardinality) {
this.dataFileName = dataFileName;
this.offset = start;
this.length = size;
this.cardinality = cardinality;
}

public String dataFileName() {
return dataFileName;
}

public int offset() {
return offset;
}

public int length() {
return length;
}

@Nullable
public Long cardinality() {
return cardinality;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
DeletionVectorMeta that = (DeletionVectorMeta) o;
return offset == that.offset
&& length == that.length
&& Objects.equals(dataFileName, that.dataFileName)
&& Objects.equals(cardinality, that.cardinality);
}

@Override
public int hashCode() {
return Objects.hash(dataFileName, offset, length, cardinality);
}

@Override
public String toString() {
return "DeletionVectorMeta{"
+ "dataFileName='"
+ dataFileName
+ '\''
+ ", offset="
+ offset
+ ", length="
+ length
+ ", cardinality="
+ cardinality
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,16 @@ public Map<String, DeletionFile> scanDVIndex(
if (meta.indexType().equals(DELETION_VECTORS_INDEX)
&& file.partition().equals(partition)
&& file.bucket() == bucket) {
LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
meta.deletionVectorsRanges();
checkNotNull(dvRanges);
for (String dataFile : dvRanges.keySet()) {
Pair<Integer, Integer> pair = dvRanges.get(dataFile);
DeletionFile deletionFile =
LinkedHashMap<String, DeletionVectorMeta> dvMetas = meta.deletionVectorMetas();
checkNotNull(dvMetas);
for (DeletionVectorMeta dvMeta : dvMetas.values()) {
result.put(
dvMeta.dataFileName(),
new DeletionFile(
filePath(meta).toString(), pair.getLeft(), pair.getRight());
result.put(dataFile, deletionFile);
filePath(meta).toString(),
dvMeta.offset(),
dvMeta.length(),
dvMeta.cardinality()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -54,12 +52,7 @@ public class IndexFileMeta {
new DataField(
4,
"_DELETIONS_VECTORS_RANGES",
new ArrayType(
true,
RowType.of(
newStringType(false),
new IntType(false),
new IntType(false))))));
new ArrayType(true, DeletionVectorMeta.SCHEMA))));

private final String indexType;
private final String fileName;
Expand All @@ -68,9 +61,9 @@ public class IndexFileMeta {

/**
* Metadata only used by {@link DeletionVectorsIndexFile}, use LinkedHashMap to ensure that the
* order of DeletionVectorRanges and the written DeletionVectors is consistent.
* order of DeletionVectorMetas and the written DeletionVectors is consistent.
*/
private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges;
private final @Nullable LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas;

public IndexFileMeta(String indexType, String fileName, long fileSize, long rowCount) {
this(indexType, fileName, fileSize, rowCount, null);
Expand All @@ -81,12 +74,12 @@ public IndexFileMeta(
String fileName,
long fileSize,
long rowCount,
@Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges) {
@Nullable LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas) {
this.indexType = indexType;
this.fileName = fileName;
this.fileSize = fileSize;
this.rowCount = rowCount;
this.deletionVectorsRanges = deletionVectorsRanges;
this.deletionVectorMetas = deletionVectorMetas;
}

public String indexType() {
Expand All @@ -105,8 +98,8 @@ public long rowCount() {
return rowCount;
}

public @Nullable LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges() {
return deletionVectorsRanges;
public @Nullable LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas() {
return deletionVectorMetas;
}

@Override
Expand All @@ -122,12 +115,12 @@ public boolean equals(Object o) {
&& Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
&& rowCount == that.rowCount
&& Objects.equals(deletionVectorsRanges, that.deletionVectorsRanges);
&& Objects.equals(deletionVectorMetas, that.deletionVectorMetas);
}

@Override
public int hashCode() {
return Objects.hash(indexType, fileName, fileSize, rowCount, deletionVectorsRanges);
return Objects.hash(indexType, fileName, fileSize, rowCount, deletionVectorMetas);
}

@Override
Expand All @@ -142,8 +135,8 @@ public String toString() {
+ fileSize
+ ", rowCount="
+ rowCount
+ ", deletionVectorsRanges="
+ deletionVectorsRanges
+ ", deletionVectorMetas="
+ deletionVectorMetas
+ '}';
}
}
Loading

0 comments on commit e28f834

Please sign in to comment.