Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Data files with delete records should not be upgraded directly to max level #2962

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 47 additions & 10 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
Expand All @@ -58,6 +60,8 @@ public class DataFileMeta {

private final String fileName;
private final long fileSize;

// total number of rows (including add & delete) in this file
private final long rowCount;

private final BinaryRow minKey;
Expand All @@ -73,6 +77,12 @@ public class DataFileMeta {
private final List<String> extraFiles;
private final Timestamp creationTime;

// rowCount = addRowCount + deleteRowCount
// Why don't we keep addRowCount and deleteRowCount?
// Because in previous versions of DataFileMeta, we only keep rowCount.
// We have to keep the compatibility.
private final @Nullable Long deleteRowCount;

public static DataFileMeta forAppend(
String fileName,
long fileSize,
Expand All @@ -92,7 +102,8 @@ public static DataFileMeta forAppend(
minSequenceNumber,
maxSequenceNumber,
schemaId,
DUMMY_LEVEL);
DUMMY_LEVEL,
0L);
}

public DataFileMeta(
Expand All @@ -106,7 +117,8 @@ public DataFileMeta(
long minSequenceNumber,
long maxSequenceNumber,
long schemaId,
int level) {
int level,
@Nullable Long deleteRowCount) {
this(
fileName,
fileSize,
Expand All @@ -120,7 +132,8 @@ public DataFileMeta(
schemaId,
level,
Collections.emptyList(),
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp());
Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(),
deleteRowCount);
}

public DataFileMeta(
Expand All @@ -136,9 +149,11 @@ public DataFileMeta(
long schemaId,
int level,
List<String> extraFiles,
Timestamp creationTime) {
Timestamp creationTime,
@Nullable Long deleteRowCount) {
this.fileName = fileName;
this.fileSize = fileSize;

this.rowCount = rowCount;

this.minKey = minKey;
Expand All @@ -152,6 +167,8 @@ public DataFileMeta(
this.schemaId = schemaId;
this.extraFiles = Collections.unmodifiableList(extraFiles);
this.creationTime = creationTime;

this.deleteRowCount = deleteRowCount;
}

public String fileName() {
Expand All @@ -166,6 +183,14 @@ public long rowCount() {
return rowCount;
}

public Optional<Long> addRowCount() {
return Optional.ofNullable(deleteRowCount).map(c -> rowCount - c);
}

public Optional<Long> deleteRowCount() {
return Optional.ofNullable(deleteRowCount);
}

public BinaryRow minKey() {
return minKey;
}
Expand Down Expand Up @@ -250,7 +275,8 @@ public DataFileMeta upgrade(int newLevel) {
schemaId,
newLevel,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
Expand All @@ -274,11 +300,15 @@ public DataFileMeta copy(List<String> newExtraFiles) {
schemaId,
level,
newExtraFiles,
creationTime);
creationTime,
deleteRowCount);
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof DataFileMeta)) {
return false;
}
Expand All @@ -295,7 +325,8 @@ public boolean equals(Object o) {
&& schemaId == that.schemaId
&& level == that.level
&& Objects.equals(extraFiles, that.extraFiles)
&& Objects.equals(creationTime, that.creationTime);
&& Objects.equals(creationTime, that.creationTime)
&& Objects.equals(deleteRowCount, that.deleteRowCount);
}

@Override
Expand All @@ -313,13 +344,17 @@ public int hashCode() {
schemaId,
level,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

@Override
public String toString() {
return String.format(
"{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s, %s}",
"{fileName: %s, fileSize: %d, rowCount: %d, "
+ "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, "
+ "minSequenceNumber: %d, maxSequenceNumber: %d, "
+ "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d}",
fileName,
fileSize,
rowCount,
Expand All @@ -332,7 +367,8 @@ public String toString() {
schemaId,
level,
extraFiles,
creationTime);
creationTime,
deleteRowCount);
}

public static RowType schema() {
Expand All @@ -350,6 +386,7 @@ public static RowType schema() {
fields.add(new DataField(10, "_LEVEL", new IntType(false)));
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true)));
return new RowType(fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public InternalRow toRow(DataFileMeta meta) {
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime());
meta.creationTime(),
meta.deleteRowCount().orElse(null));
}

@Override
Expand All @@ -71,6 +72,7 @@ public DataFileMeta fromRow(InternalRow row) {
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3));
row.getTimestamp(12, 3),
row.isNullAt(13) ? null : row.getLong(13));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class KeyValueDataFileWriter
private InternalRow maxKey = null;
private long minSeqNumber = Long.MAX_VALUE;
private long maxSeqNumber = Long.MIN_VALUE;
private long deleteRecordCount = 0;

public KeyValueDataFileWriter(
FileIO fileIO,
Expand Down Expand Up @@ -111,6 +112,10 @@ public void write(KeyValue kv) throws IOException {
updateMinSeqNumber(kv);
updateMaxSeqNumber(kv);

if (kv.valueKind().isRetract()) {
deleteRecordCount++;
}

if (LOG.isDebugEnabled()) {
LOG.debug("Write to Path " + path + " key value " + kv.toString(keyType, valueType));
}
Expand Down Expand Up @@ -162,6 +167,7 @@ public DataFileMeta result() throws IOException {
minSeqNumber,
maxSeqNumber,
schemaId,
level);
level,
deleteRecordCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public Levels(
this.keyComparator = keyComparator;

// in case the num of levels is not specified explicitly
int restoredMaxLevel =
int restoredNumLevels =
Math.max(
numLevels,
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
checkArgument(restoredMaxLevel > 1, "levels must be at least 2.");
checkArgument(restoredNumLevels > 1, "Number of levels must be at least 2.");
this.level0 =
new TreeSet<>(
(a, b) -> {
Expand All @@ -70,7 +70,7 @@ public Levels(
}
});
this.levels = new ArrayList<>();
for (int i = 1; i < restoredMaxLevel; i++) {
for (int i = 1; i < restoredNumLevels; i++) {
levels.add(SortedRun.empty());
}

Expand Down Expand Up @@ -108,6 +108,10 @@ public int numberOfLevels() {
return levels.size() + 1;
}

public int maxLevel() {
return levels.size();
}

public int numberOfSortedRuns() {
int numberOfSortedRuns = level0.size();
for (SortedRun run : levels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
rewriter,
unit,
dropDelete,
levels.maxLevel(),
metricsReporter);
if (LOG.isDebugEnabled()) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MergeTreeCompactTask extends CompactTask {
private final List<List<SortedRun>> partitioned;

private final boolean dropDelete;
private final int maxLevel;

// metric
private int upgradeFilesNum;
Expand All @@ -54,13 +55,15 @@ public MergeTreeCompactTask(
CompactRewriter rewriter,
CompactUnit unit,
boolean dropDelete,
int maxLevel,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
this.minFileSize = minFileSize;
this.rewriter = rewriter;
this.outputLevel = unit.outputLevel();
this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition();
this.dropDelete = dropDelete;
this.maxLevel = maxLevel;

this.upgradeFilesNum = 0;
}
Expand Down Expand Up @@ -107,10 +110,20 @@ protected String logMetric(
}

private void upgrade(DataFileMeta file, CompactResult toUpdate) throws Exception {
if (file.level() != outputLevel) {
if (file.level() == outputLevel) {
return;
}

if (outputLevel != maxLevel || file.deleteRowCount().map(d -> d == 0).orElse(false)) {
CompactResult upgradeResult = rewriter.upgrade(outputLevel, file);
toUpdate.merge(upgradeResult);
upgradeFilesNum++;
} else {
// files with delete records should not be upgraded directly to max level
List<List<SortedRun>> candidate = new ArrayList<>();
candidate.add(new ArrayList<>());
candidate.get(0).add(SortedRun.fromSingle(file));
rewriteImpl(candidate, toUpdate);
}
}

Expand All @@ -130,6 +143,11 @@ private void rewrite(List<List<SortedRun>> candidate, CompactResult toUpdate) th
return;
}
}
rewriteImpl(candidate, toUpdate);
}

private void rewriteImpl(List<List<SortedRun>> candidate, CompactResult toUpdate)
throws Exception {
CompactResult rewriteResult = rewriter.rewrite(outputLevel, dropDelete, candidate);
toUpdate.merge(rewriteResult);
candidate.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ private DataFileMeta newFile(long fileSize) {
0,
0,
0,
0);
0,
0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ private static DataFileMeta newFile(long timeMillis) {
Timestamp.fromLocalDateTime(
Instant.ofEpochMilli(timeMillis)
.atZone(ZoneId.systemDefault())
.toLocalDateTime()));
.toLocalDateTime()),
0L);
}

private Pair<InternalRow, Integer> row(int pt, int col, int pk, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ private Data createDataFile(List<KeyValue> kvs, int level, BinaryRow partition,
minSequenceNumber,
maxSequenceNumber,
0,
level),
level,
kvs.stream().filter(kv -> kv.valueKind().isRetract()).count()),
kvs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
return new DataFileMeta(
"",
maxSeq - minSeq + 1,
maxSeq - minSeq + 1,
0L,
DataFileMeta.EMPTY_MIN_KEY,
DataFileMeta.EMPTY_MAX_KEY,
DataFileMeta.EMPTY_KEY_STATS,
Expand All @@ -50,7 +50,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) {
0L,
DataFileMeta.DUMMY_LEVEL,
Collections.emptyList(),
Timestamp.fromEpochMillis(100));
Timestamp.fromEpochMillis(100),
maxSeq - minSeq + 1);
}

public static DataFileMeta newFile() {
Expand All @@ -65,7 +66,8 @@ public static DataFileMeta newFile() {
0,
0,
0,
0);
0,
0L);
}

public static DataFileMeta newFile(
Expand All @@ -81,7 +83,8 @@ public static DataFileMeta newFile(
0,
maxSequence,
0,
level);
level,
0L);
}

public static BinaryRow row(int i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public static DataFileMeta newFile(int name, int level) {
0,
1,
0,
level);
level,
0L);
}
}
Loading
Loading