Skip to content

Commit

Permalink
[core] commit.force-create-snapshot to force a snapshot to be gener…
Browse files Browse the repository at this point in the history
…ated when committing (#2964)
  • Loading branch information
huyuanfeng2018 authored Mar 7, 2024
1 parent ec5ebeb commit 0b3509d
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@
<td>Boolean</td>
<td>Whether to force a compaction before commit.</td>
</tr>
<tr>
<td><h5>commit.force-create-snapshot</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to force create snapshot on commit.</td>
</tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,12 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(10))
.withDescription("The threshold for read file async.");

public static final ConfigOption<Boolean> COMMIT_FORCE_CREATE_SNAPSHOT =
key("commit.force-create-snapshot")
.booleanType()
.defaultValue(false)
.withDescription("Whether to force create snapshot on commit.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1563,6 +1569,10 @@ public String sinkWatermarkTimeZone() {
return options.get(SINK_WATERMARK_TIME_ZONE);
}

public boolean forceCreatingSnapshot() {
return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
}

public Map<String, String> getFieldDefaultValues() {
Map<String, String> defaultValues = new HashMap<>();
String fieldPrefix = FIELDS_PREFIX + ".";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,9 @@ public TableCommitImpl newCommit(String commitUser, String branchName) {
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
options.snapshotExpireExecutionMode(),
name());
coreOptions().snapshotExpireExecutionMode(),
name(),
coreOptions().forceCreatingSnapshot());
}

private List<CommitCallback> createCommitCallbacks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class TableCommitImpl implements InnerTableCommit {

@Nullable private Map<String, String> overwritePartition = null;
private boolean batchCommitted = false;
private final boolean forceCreatingSnapshot;

public TableCommitImpl(
FileStoreCommit commit,
Expand All @@ -99,7 +100,8 @@ public TableCommitImpl(
@Nullable Duration consumerExpireTime,
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode,
String tableName) {
String tableName,
boolean forceCreatingSnapshot) {
commit.withLock(lock);
if (partitionExpire != null) {
partitionExpire.withLock(lock);
Expand All @@ -124,10 +126,12 @@ public TableCommitImpl(
this.expireError = new AtomicReference<>(null);

this.tableName = tableName;
this.forceCreatingSnapshot = forceCreatingSnapshot;
}

public boolean forceCreatingSnapshot() {
return tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot();
return this.forceCreatingSnapshot
|| (tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,24 @@ public void testEmptyCommit() throws Exception {
assertThat(snapshot).isNull();
}

@Test
public void testForceCreateSnapshotCommit() throws Exception {
FileStoreTable table =
createFileStoreTable(
options ->
options.set(
CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true"));

OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createRecoverableTestHarness(table);
testHarness.open();

testHarness.snapshot(1, 1);
testHarness.notifyOfCompletedCheckpoint(1);
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();
}

@Test
public void testEmptyCommitWithProcessTimeTag() throws Exception {
FileStoreTable table =
Expand Down

0 comments on commit 0b3509d

Please sign in to comment.