Skip to content

Commit

Permalink
Step3: support write delete map index in lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Feb 7, 2024
1 parent ac62934 commit 85a069b
Show file tree
Hide file tree
Showing 19 changed files with 277 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.HashIndexMaintainer;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.index.delete.DeleteMapIndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
Expand Down Expand Up @@ -147,6 +149,10 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
if (bucketMode() == BucketMode.DYNAMIC) {
indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler());
}
IndexMaintainer.Factory<KeyValue, DeleteIndex> deleteMapFactory = null;
if (options.deleteMapEnabled()) {
deleteMapFactory = new DeleteMapIndexMaintainer.Factory(newIndexFileHandler());
}
return new KeyValueFileStoreWrite(
fileIO,
schemaManager,
Expand All @@ -162,6 +168,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
deleteMapFactory,
options,
keyValueFieldsExtractor,
tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.index.delete;

import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
Expand Down Expand Up @@ -115,6 +116,12 @@ public Optional<DeleteIndex> indexOf(String fileName) {
return Optional.ofNullable(deleteMap.get(fileName));
}

@VisibleForTesting
public Map<String, DeleteIndex> deleteMap() {
restoreDeleteMap();
return deleteMap;
}

/** Factory to restore {@link DeleteMapIndexMaintainer}. */
public static class Factory implements IndexMaintainer.Factory<KeyValue, DeleteIndex> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable {
private final LookupStoreFactory lookupStoreFactory;
private final Cache<String, LookupFile> lookupFiles;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final boolean deleteMapEnabled;

public LookupLevels(
Levels levels,
Expand All @@ -79,7 +80,8 @@ public LookupLevels(
LookupStoreFactory lookupStoreFactory,
Duration fileRetention,
MemorySize maxDiskSize,
Function<Long, BloomFilter.Builder> bfGenerator) {
Function<Long, BloomFilter.Builder> bfGenerator,
boolean deleteMapEnabled) {
this.levels = levels;
this.keyComparator = keyComparator;
this.keySerializer = new RowCompactedSerializer(keyType);
Expand All @@ -96,6 +98,7 @@ public LookupLevels(
.executor(MoreExecutors.directExecutor())
.build();
this.bfGenerator = bfGenerator;
this.deleteMapEnabled = deleteMapEnabled;
levels.addDropFileCallback(this);
}

Expand Down Expand Up @@ -145,11 +148,19 @@ private KeyValue lookup(InternalRow key, DataFileMeta file) throws IOException {
return null;
}
InternalRow value = valueSerializer.deserialize(valueBytes);
long sequenceNumber = MemorySegment.wrap(valueBytes).getLong(valueBytes.length - 9);
long sequenceNumber =
MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 9);
RowKind rowKind = RowKind.fromByteValue(valueBytes[valueBytes.length - 1]);
return new KeyValue()
.replace(key, sequenceNumber, rowKind, value)
.setLevel(lookupFile.remoteFile().level());
KeyValue keyValue =
new KeyValue()
.replace(key, sequenceNumber, rowKind, value)
.setLevel(lookupFile.remoteFile().level());
if (deleteMapEnabled) {
keyValue.setPosition(
MemorySegment.wrap(valueBytes).getLongBigEndian(valueBytes.length - 17))
.setFileName(file.fileName());
}
return keyValue;
}

private int fileWeigh(String file, LookupFile lookupFile) {
Expand Down Expand Up @@ -183,6 +194,9 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
byte[] keyBytes = keySerializer.serializeToBytes(kv.key());
valueOut.clear();
valueOut.write(valueSerializer.serializeToBytes(kv.value()));
if (deleteMapEnabled) {
valueOut.writeLong(kv.position());
}
valueOut.writeLong(kv.sequenceNumber());
valueOut.writeByte(kv.valueKind().toByteValue());
byte[] valueBytes = valueOut.getCopyOfBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
Expand All @@ -32,6 +34,8 @@
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.reader.RecordReaderIterator;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -47,6 +51,7 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite
protected final MergeEngine mergeEngine;
protected final RecordEqualiser valueEqualiser;
protected final boolean changelogRowDeduplicate;
protected final @Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer;

public ChangelogMergeTreeRewriter(
int maxLevel,
Expand All @@ -57,12 +62,14 @@ public ChangelogMergeTreeRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
boolean changelogRowDeduplicate,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
super(readerFactory, writerFactory, keyComparator, mfFactory, mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.deleteMapMaintainer = deleteMapMaintainer;
}

protected abstract boolean rewriteChangelog(
Expand Down Expand Up @@ -159,6 +166,11 @@ private CompactResult rewriteChangelogCompaction(
.map(x -> x.upgrade(outputLevel))
.collect(Collectors.toList());

if (deleteMapMaintainer != null) {
for (DataFileMeta dataFileMeta : before) {
deleteMapMaintainer.delete(dataFileMeta.fileName());
}
}
return new CompactResult(before, after, changelogFileWriter.result());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
Expand All @@ -31,6 +33,8 @@
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.Filter;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
Expand Down Expand Up @@ -59,7 +63,8 @@ public FirstRowMergeTreeCompactRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
boolean changelogRowDeduplicate,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
super(
maxLevel,
mergeEngine,
Expand All @@ -69,7 +74,8 @@ public FirstRowMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
valueEqualiser,
changelogRowDeduplicate);
changelogRowDeduplicate,
deleteMapMaintainer);
this.containsLevels = containsLevels;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public FullChangelogMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
valueComparator,
changelogRowDeduplicate);
changelogRowDeduplicate,
null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.types.RowKind;

import javax.annotation.Nullable;

import java.util.function.Function;

import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -53,12 +57,14 @@ public class LookupChangelogMergeFunctionWrapper implements MergeFunctionWrapper
private final KeyValue reusedAfter = new KeyValue();
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
private final @Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer;

public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
Function<InternalRow, KeyValue> lookup,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
boolean changelogRowDeduplicate,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
checkArgument(
mergeFunction instanceof LookupMergeFunction,
Expand All @@ -69,6 +75,7 @@ public LookupChangelogMergeFunctionWrapper(
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
this.deleteMapMaintainer = deleteMapMaintainer;
}

@Override
Expand Down Expand Up @@ -113,6 +120,9 @@ public ChangelogResult getResult() {
mergeFunction2.add(result);
result = mergeFunction2.getResult();
setChangelog(highLevel, result);
if (highLevel.isAdd() && deleteMapMaintainer != null) {
deleteMapMaintainer.notifyNewRecord(highLevel);
}
} else {
setChangelog(null, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.index.delete.DeleteIndex;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
Expand Down Expand Up @@ -56,7 +60,8 @@ public LookupMergeTreeCompactRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
RecordEqualiser valueEqualiser,
boolean changelogRowDeduplicate) {
boolean changelogRowDeduplicate,
@Nullable IndexMaintainer<KeyValue, DeleteIndex> deleteMapMaintainer) {
super(
maxLevel,
mergeEngine,
Expand All @@ -66,7 +71,8 @@ public LookupMergeTreeCompactRewriter(
mfFactory,
mergeSorter,
valueEqualiser,
changelogRowDeduplicate);
changelogRowDeduplicate,
deleteMapMaintainer);
this.lookupLevels = lookupLevels;
}

Expand Down Expand Up @@ -110,7 +116,8 @@ protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLev
}
},
valueEqualiser,
changelogRowDeduplicate);
changelogRowDeduplicate,
deleteMapMaintainer);
}

@Override
Expand Down
Loading

0 comments on commit 85a069b

Please sign in to comment.