Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Apr 16, 2024
1 parent de5088c commit 6cd9abc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception {
flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(
waitCompaction
|| commitForceCompact
|| compactManager.shouldWaitForPreparingCheckpoint());
if (commitForceCompact) {
waitCompaction = true;
}
// Decide again whether to wait here.
// For example, in the case of repeated failures in writing, it is possible that Level 0
// files were successfully committed, but failed to restart during the compaction phase,
// which may result in an increasing number of Level 0 files. This wait can avoid this
// situation.
if (compactManager.shouldWaitForPreparingCheckpoint()) {
waitCompaction = true;
}
trySyncLatestCompaction(waitCompaction);
return drainIncrement();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,15 @@ private SchemaManager createTestingSchemaManager(Path path) {
}

private void recreateMergeTree(long targetFileSize) {
Options configuration = new Options();
configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
configuration.set(CoreOptions.SORT_ENGINE, getSortEngine());
options = new CoreOptions(configuration);
Options options = new Options();
options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
options.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize));
options.set(CoreOptions.SORT_ENGINE, getSortEngine());
options.set(
CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER,
options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1);
this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType())));

Expand Down Expand Up @@ -188,9 +191,9 @@ public List<DataField> valueFields(TableSchema schema) {
valueType,
flushingAvro,
pathFactoryMap,
options.targetFileSize());
writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options);
compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options);
this.options.targetFileSize());
writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options);
compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options);
writer = createMergeTreeWriter(Collections.emptyList());
}

Expand Down

0 comments on commit 6cd9abc

Please sign in to comment.