Skip to content

Commit

Permalink
[common] supports configuring force-lookup.
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Aug 7, 2024
1 parent 39ca57d commit 4bb0eab
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 9 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@
<td>Integer</td>
<td>Total level number, for example, there are 3 levels, including 0,1,2 levels.</td>
</tr>
<tr>
<td><h5>prefer-compaction-strategy</h5></td>
<td style="word-wrap: break-word;">universal</td>
<td>Enum</td>
<td>By default, universal compaction strategy is used. Note: This is just a suggestion for paimon to use this compaction strategy. Paimon will automatically decide which compaction strategy to use. For example, when 'changelog-producer' is set to 'lookup', paimon will automatically use the lookup compaction strategy.<br /><br />Possible values:<ul><li>"universal": Universal compaction strategy, mainly triggered by the number of sorted runs, space amplification, etc.</li><li>"lookup": When the L0 file is generated, compaction will be triggered as soon as possible.</li></ul></td>
</tr>
<tr>
<td><h5>num-sorted-run.compaction-trigger</h5></td>
<td style="word-wrap: break-word;">5</td>
Expand Down
48 changes: 47 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,17 @@ public class CoreOptions implements Serializable {
text("append table: the default value is 256 MB."))
.build());

@Immutable
public static final ConfigOption<CompactionStrategy> PREFER_COMPACTION_STRATEGY =
key("prefer-compaction-strategy")
.enumType(CompactionStrategy.class)
.defaultValue(CompactionStrategy.UNIVERSAL)
.withDescription(
"By default, universal compaction strategy is used. Note: This is just a suggestion "
+ "for paimon to use this compaction strategy. Paimon will automatically decide "
+ "which compaction strategy to use. For example, when 'changelog-producer' is set "
+ "to 'lookup', paimon will automatically use the lookup compaction strategy.");

public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER =
key("num-sorted-run.compaction-trigger")
.intType()
Expand Down Expand Up @@ -1691,7 +1702,8 @@ public LookupStrategy lookupStrategy() {
return LookupStrategy.from(
mergeEngine().equals(MergeEngine.FIRST_ROW),
changelogProducer().equals(ChangelogProducer.LOOKUP),
deletionVectorsEnabled());
deletionVectorsEnabled(),
preferCompactionStrategy().equals(CompactionStrategy.LOOKUP));
}

public boolean changelogRowDeduplicate() {
Expand Down Expand Up @@ -2024,6 +2036,10 @@ public boolean metadataIcebergCompatible() {
return options.get(METADATA_ICEBERG_COMPATIBLE);
}

public CompactionStrategy preferCompactionStrategy() {
return options.get(PREFER_COMPACTION_STRATEGY);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down Expand Up @@ -2632,4 +2648,34 @@ public InlineElement getDescription() {
return text(description);
}
}

/** The compaction strategy of the LSM tree. */
public enum CompactionStrategy implements DescribedEnum {
UNIVERSAL(
"universal",
"Universal compaction strategy, mainly triggered by the number of sorted runs, space amplification, etc."),

LOOKUP(
"lookup",
"When the L0 file is generated, compaction will be triggered as soon as possible.");

private final String value;

private final String description;

CompactionStrategy(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return text(description);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ public class LookupStrategy {

public final boolean deletionVector;

private LookupStrategy(boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
private LookupStrategy(
boolean isFirstRow,
boolean produceChangelog,
boolean deletionVector,
boolean preferLookup) {
this.isFirstRow = isFirstRow;
this.produceChangelog = produceChangelog;
this.deletionVector = deletionVector;
this.needLookup = produceChangelog || deletionVector || isFirstRow;
this.needLookup = produceChangelog || deletionVector || isFirstRow || preferLookup;
}

public static LookupStrategy from(
boolean isFirstRow, boolean produceChangelog, boolean deletionVector) {
return new LookupStrategy(isFirstRow, produceChangelog, deletionVector);
boolean isFirstRow,
boolean produceChangelog,
boolean deletionVector,
boolean preferLookup) {
return new LookupStrategy(isFirstRow, produceChangelog, deletionVector, preferLookup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testDeduplicate(boolean changelogRowDeduplicate) {
highLevel::get,
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.from(false, true, false),
LookupStrategy.from(false, true, false, false),
null,
null);

Expand Down Expand Up @@ -233,7 +233,7 @@ public void testSum(boolean changelogRowDeduplicate) {
key -> null,
EQUALISER,
changelogRowDeduplicate,
LookupStrategy.from(false, true, false),
LookupStrategy.from(false, true, false, false),
null,
null);

Expand Down Expand Up @@ -322,7 +322,7 @@ public void testMergeHighLevelOrder() {
highLevel::get,
EQUALISER,
false,
LookupStrategy.from(false, true, false),
LookupStrategy.from(false, true, false, false),
null,
UserDefinedSeqComparator.create(
RowType.builder().field("f0", DataTypes.INT()).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.CompactionStrategy;
import org.apache.paimon.CoreOptions.LookupLocalFileType;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
Expand All @@ -29,6 +30,8 @@
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -65,9 +68,11 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CompatibilityTestUtils;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
Expand Down Expand Up @@ -95,7 +100,11 @@
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine;
import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
Expand Down Expand Up @@ -1670,6 +1679,65 @@ public void testRollbackToTagWithChangelogDecoupled(String changelogProducer) th
// table-path/changelog/EARLIEST
}

@ParameterizedTest
@EnumSource(CoreOptions.MergeEngine.class)
public void testPreferLookupCompactionStrategy(CoreOptions.MergeEngine mergeEngine)
throws Exception {
Map<MergeEngine, Pair<Long, Long>> testData = new HashMap<>();
testData.put(DEDUPLICATE, Pair.of(50L, 100L));
testData.put(PARTIAL_UPDATE, Pair.of(null, 100L));
testData.put(AGGREGATE, Pair.of(30L, 70L));
testData.put(FIRST_ROW, Pair.of(100L, 70L));

Pair<Long, Long> currentTestData = testData.get(mergeEngine);
FileStoreTable table =
createFileStoreTable(
options -> {
options.set(
CoreOptions.PREFER_COMPACTION_STRATEGY,
CompactionStrategy.LOOKUP);
options.set(MERGE_ENGINE, mergeEngine);
if (mergeEngine == AGGREGATE) {
options.set("fields.b.aggregate-function", "sum");
}
});
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
write.withIOManager(IOManager.create(tempDir.toString()));

// write data
write.write(rowData(1, 10, currentTestData.getLeft()));
commit.commit(1, write.prepareCommit(true, 1));
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2L);

write.write(rowData(1, 10, currentTestData.getRight()));
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(4L);
assertThat(table.snapshotManager().latestSnapshot())
.matches(snapshot -> snapshot.commitKind() == COMPACT);

// 3 data files + bucket-0 directory
List<java.nio.file.Path> files =
Files.walk(new File(tablePath.toUri().getPath(), "pt=1/bucket-0").toPath())
.collect(Collectors.toList());
assertThat(files.size()).isEqualTo(4);

// 2 data files compact into 1 file
FileStoreScan scan = table.store().newScan().withKind(ScanMode.DELTA);
assertThat(scan.plan().files(FileKind.ADD).size()).isEqualTo(1);
assertThat(scan.plan().files(FileKind.DELETE).size()).isEqualTo(2);

// check result
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
TableRead read = table.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Collections.singletonList(
"1|10|100|binary|varbinary|mapKey:mapVal|multiset"));
}

private void assertReadChangelog(int id, FileStoreTable table) throws Exception {
// read the changelog at #{id}
table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/**
* {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
* org.apache.paimon.CoreOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} set to false.
* org.apache.paimon.CoreOptions#LOOKUP_WAIT} set to false.
*/
public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {

Expand Down

0 comments on commit 4bb0eab

Please sign in to comment.