Skip to content

Commit

Permalink
[core] Only keep deleted Identifiers in memory for Manifest full merge (
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Sep 10, 2024
1 parent 19bcd81 commit c32b06e
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@
package org.apache.paimon.manifest;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;

Expand Down Expand Up @@ -60,40 +67,74 @@ class Identifier {
public final int bucket;
public final int level;
public final String fileName;
public final List<String> extraFiles;
@Nullable private final byte[] embeddedIndex;

/* Cache the hash code for the string */
private Integer hash;

public Identifier(BinaryRow partition, int bucket, int level, String fileName) {
public Identifier(
BinaryRow partition,
int bucket,
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex) {
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
this.embeddedIndex = embeddedIndex;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Identifier)) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Identifier that = (Identifier) o;
return Objects.equals(partition, that.partition)
&& bucket == that.bucket
return bucket == that.bucket
&& level == that.level
&& Objects.equals(fileName, that.fileName);
&& Objects.equals(partition, that.partition)
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.deepEquals(embeddedIndex, that.embeddedIndex);
}

@Override
public int hashCode() {
if (hash == null) {
hash = Objects.hash(partition, bucket, level, fileName);
hash =
Objects.hash(
partition,
bucket,
level,
fileName,
extraFiles,
Arrays.hashCode(embeddedIndex));
}
return hash;
}

@Override
public String toString() {
return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName);
return "{partition="
+ partition
+ ", bucket="
+ bucket
+ ", level="
+ level
+ ", fileName="
+ fileName
+ ", extraFiles="
+ extraFiles
+ ", embeddedIndex="
+ Arrays.toString(embeddedIndex)
+ '}';
}

public String toString(FileStorePathFactory pathFactory) {
Expand All @@ -103,7 +144,11 @@ public String toString(FileStorePathFactory pathFactory) {
+ ", level "
+ level
+ ", file "
+ fileName;
+ fileName
+ ", extraFiles "
+ extraFiles
+ ", embeddedIndex "
+ Arrays.toString(embeddedIndex);
}
}

Expand Down Expand Up @@ -161,4 +206,37 @@ static Iterable<ManifestEntry> readManifestEntries(
manifestFiles,
manifestReadParallelism);
}

static Set<Identifier> readDeletedEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
@Nullable Integer manifestReadParallelism) {
manifestFiles =
manifestFiles.stream()
.filter(file -> file.numDeletedFiles() > 0)
.collect(Collectors.toList());
Function<ManifestFileMeta, List<Identifier>> processor =
file ->
manifestFile
.read(
file.fileName(),
file.fileSize(),
Filter.alwaysTrue(),
deletedFilter())
.stream()
.map(ManifestEntry::identifier)
.collect(Collectors.toList());
Iterable<Identifier> identifiers =
sequentialBatchedExecute(processor, manifestFiles, manifestReadParallelism);
Set<Identifier> result = new HashSet<>();
for (Identifier identifier : identifiers) {
result.add(identifier);
}
return result;
}

static Filter<InternalRow> deletedFilter() {
Function<InternalRow, FileKind> getter = ManifestEntrySerializer.kindGetter();
return row -> getter.apply(row) == FileKind.DELETE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ public DataFileMeta file() {

@Override
public Identifier identifier() {
return new Identifier(partition, bucket, file.level(), file.fileName());
return new Identifier(
partition,
bucket,
file.level(),
file.fileName(),
file.extraFiles(),
file.embeddedIndex());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public ManifestEntry convertFrom(int version, InternalRow row) {
dataFileMetaSerializer.fromRow(row.getRow(4, dataFileMetaSerializer.numFields())));
}

public static Function<InternalRow, FileKind> kindGetter() {
return row -> FileKind.fromByteValue(row.getByte(1));
}

public static Function<InternalRow, BinaryRow> partitionGetter() {
return row -> deserializeBinaryRow(row.getBinary(2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.data.BinaryRow;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -32,6 +34,8 @@ public class SimpleFileEntry implements FileEntry {
private final int bucket;
private final int level;
private final String fileName;
private final List<String> extraFiles;
@Nullable private final byte[] embeddedIndex;
private final BinaryRow minKey;
private final BinaryRow maxKey;

Expand All @@ -41,13 +45,17 @@ public SimpleFileEntry(
int bucket,
int level,
String fileName,
List<String> extraFiles,
@Nullable byte[] embeddedIndex,
BinaryRow minKey,
BinaryRow maxKey) {
this.kind = kind;
this.partition = partition;
this.bucket = bucket;
this.level = level;
this.fileName = fileName;
this.extraFiles = extraFiles;
this.embeddedIndex = embeddedIndex;
this.minKey = minKey;
this.maxKey = maxKey;
}
Expand All @@ -59,6 +67,8 @@ public static SimpleFileEntry from(ManifestEntry entry) {
entry.bucket(),
entry.level(),
entry.fileName(),
entry.file().extraFiles(),
entry.file().embeddedIndex(),
entry.minKey(),
entry.maxKey());
}
Expand Down Expand Up @@ -94,7 +104,7 @@ public String fileName() {

@Override
public Identifier identifier() {
return new Identifier(partition, bucket, level, fileName);
return new Identifier(partition, bucket, level, fileName, extraFiles, embeddedIndex);
}

@Override
Expand All @@ -121,13 +131,14 @@ public boolean equals(Object o) {
&& kind == that.kind
&& Objects.equals(partition, that.partition)
&& Objects.equals(fileName, that.fileName)
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(minKey, that.minKey)
&& Objects.equals(maxKey, that.maxKey);
}

@Override
public int hashCode() {
return Objects.hash(kind, partition, bucket, level, fileName, minKey, maxKey);
return Objects.hash(kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey);
}

@Override
Expand All @@ -141,9 +152,10 @@ public String toString() {
+ bucket
+ ", level="
+ level
+ ", fileName='"
+ ", fileName="
+ fileName
+ '\''
+ ", extraFiles="
+ extraFiles
+ ", minKey="
+ minKey
+ ", maxKey="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.utils.VersionedObjectSerializer;

import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;

/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */
Expand Down Expand Up @@ -59,6 +60,8 @@ public SimpleFileEntry convertFrom(int version, InternalRow row) {
row.getInt(2),
file.getInt(10),
file.getString(0).toString(),
fromStringArrayData(file.getArray(11)),
file.isNullAt(14) ? null : file.getBinary(14),
deserializeBinaryRow(file.getBinary(3)),
deserializeBinaryRow(file.getBinary(4)));
}
Expand Down
Loading

0 comments on commit c32b06e

Please sign in to comment.