Skip to content

Commit

Permalink
[core] Increase default sorted run stop trigger (#3220)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 22, 2024
1 parent 80cf3fc commit b1b0acd
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 17 deletions.
2 changes: 1 addition & 1 deletion docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ the threshold.
<td>No</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1.</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
</tbody>
</table>
Expand Down
2 changes: 1 addition & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@
<td><h5>num-sorted-run.stop-trigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1.</td>
<td>The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
<tr>
<td><h5>page-size</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription(
"The number of sorted runs that trigger the stopping of writes,"
+ " the default value is 'num-sorted-run.compaction-trigger' + 1.");
+ " the default value is 'num-sorted-run.compaction-trigger' + 3.");

public static final ConfigOption<Integer> NUM_LEVELS =
key("num-levels")
Expand Down Expand Up @@ -1415,7 +1415,7 @@ public Duration optimizedCompactionInterval() {
public int numSortedRunStopTrigger() {
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
if (stopTrigger == null) {
stopTrigger = MathUtils.incrementSafely(numSortedRunCompactionTrigger());
stopTrigger = MathUtils.addSafely(numSortedRunCompactionTrigger(), 3);
}
return Math.max(numSortedRunCompactionTrigger(), stopTrigger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,13 @@ public static int incrementSafely(int a) {
}
return a + 1;
}

/** Safely add the given int value by another int value, ensuring that no overflow occurs. */
public static int addSafely(int a, int b) {
try {
return Math.addExact(a, b);
} catch (ArithmeticException e) {
return Integer.MAX_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(
waitCompaction
|| commitForceCompact
|| compactManager.shouldWaitForPreparingCheckpoint());
if (commitForceCompact) {
waitCompaction = true;
}
// Decide again whether to wait here.
// For example, in the case of repeated failures in writing, it is possible that Level 0
// files were successfully committed, but failed to restart during the compaction phase,
// which may result in an increasing number of Level 0 files. This wait can avoid this
// situation.
if (compactManager.shouldWaitForPreparingCheckpoint()) {
waitCompaction = true;
}
trySyncLatestCompaction(waitCompaction);
return drainIncrement();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ private SchemaManager createTestingSchemaManager(Path path) {
}

private void recreateMergeTree(long targetFileSize) {
Options configuration = new Options();
configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
configuration.set(CoreOptions.SORT_ENGINE, getSortEngine());
options = new CoreOptions(configuration);
Options options = new Options();
options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
options.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
options.set(CoreOptions.SORT_ENGINE, getSortEngine());
options.set(
CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER,
options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1);
this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType())));

Expand Down Expand Up @@ -188,9 +191,9 @@ public List<DataField> valueFields(TableSchema schema) {
valueType,
flushingAvro,
pathFactoryMap,
options.targetFileSize());
writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options);
compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options);
this.options.targetFileSize());
writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options);
compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options);
writer = createMergeTreeWriter(Collections.emptyList());
}

Expand Down

0 comments on commit b1b0acd

Please sign in to comment.