Skip to content

Commit

Permalink
[core] Refactor changelog to snapshot reference (#3134)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 1, 2024
1 parent 6378e75 commit 84d6c55
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 115 deletions.
6 changes: 3 additions & 3 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.min</h5></td>
<td style="word-wrap: break-word;">10</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The minimum number of completed changelog to retain. Should be greater than or equal to 1.</td>
</tr>
<tr>
<td><h5>changelog.time-retained</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The maximum time of completed changelog to retain.</td>
</tr>
Expand Down
20 changes: 10 additions & 10 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,22 +194,21 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MIN =
key("changelog.num-retained.min")
.intType()
.defaultValue(10)
.noDefaultValue()
.withDescription(
"The minimum number of completed changelog to retain. Should be greater than or equal to 1.");

@Documentation.OverrideDefault("infinite")
public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MAX =
key("changelog.num-retained.max")
.intType()
.defaultValue(Integer.MAX_VALUE)
.noDefaultValue()
.withDescription(
"The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.");

public static final ConfigOption<Duration> CHANGELOG_TIME_RETAINED =
key("changelog.time-retained")
.durationType()
.defaultValue(Duration.ofHours(1))
.noDefaultValue()
.withDescription("The maximum time of completed changelog to retain.");

public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE =
Expand Down Expand Up @@ -1228,22 +1227,23 @@ public Duration snapshotTimeRetain() {
}

public int changelogNumRetainMin() {
return options.get(CHANGELOG_NUM_RETAINED_MIN);
return options.getOptional(CHANGELOG_NUM_RETAINED_MIN)
.orElse(options.get(SNAPSHOT_NUM_RETAINED_MIN));
}

public int changelogNumRetainMax() {
return options.get(CHANGELOG_NUM_RETAINED_MAX);
return options.getOptional(CHANGELOG_NUM_RETAINED_MAX)
.orElse(options.get(SNAPSHOT_NUM_RETAINED_MAX));
}

public Duration changelogTimeRetain() {
return options.get(CHANGELOG_TIME_RETAINED);
return options.getOptional(CHANGELOG_TIME_RETAINED)
.orElse(options.get(SNAPSHOT_TIME_RETAINED));
}

public boolean changelogLifecycleDecoupled() {
return changelogNumRetainMax() > snapshotNumRetainMax()
|| options.get(CHANGELOG_TIME_RETAINED)
.compareTo(options.get(SNAPSHOT_TIME_RETAINED))
> 0
|| changelogTimeRetain().compareTo(snapshotTimeRetain()) > 0
|| changelogNumRetainMin() > snapshotNumRetainMin();
}

Expand Down
54 changes: 18 additions & 36 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,25 @@
*/
public class Changelog extends Snapshot {

private static final int CURRENT_VERSION = Snapshot.CURRENT_VERSION;

public Changelog(
long id,
long schemaId,
String baseManifestList,
String deltaManifestList,
@Nullable String changelogManifestList,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
Map<Integer, Long> logOffsets,
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics) {
public Changelog(Snapshot snapshot) {
this(
CURRENT_VERSION,
id,
schemaId,
baseManifestList,
deltaManifestList,
changelogManifestList,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
snapshot.version(),
snapshot.id(),
snapshot.schemaId(),
snapshot.baseManifestList(),
snapshot.deltaManifestList(),
snapshot.changelogManifestList(),
snapshot.indexManifest(),
snapshot.commitUser(),
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
snapshot.logOffsets(),
snapshot.totalRecordCount(),
snapshot.deltaRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.statistics());
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,53 +222,9 @@ public int expireUntil(long earliestId, long endExclusiveId) {
}

Snapshot snapshot = snapshotManager.snapshot(id);
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled);
if (changelogDecoupled) {
Changelog changelog;
if (snapshot.changelogManifestList() != null) {
changelog =
new Changelog(
id,
snapshot.schemaId(),
null,
null,
snapshot.changelogManifestList(),
null,
snapshot.commitUser(),
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
snapshot.logOffsets(),
snapshot.totalRecordCount(),
null,
snapshot.changelogRecordCount(),
snapshot.watermark(),
null);
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, false);
} else {
// no changelog
changelog =
new Changelog(
id,
snapshot.schemaId(),
null,
null,
null,
snapshot.indexManifest(),
snapshot.commitUser(),
snapshot.commitIdentifier(),
snapshot.commitKind(),
snapshot.timeMillis(),
snapshot.logOffsets(),
snapshot.changelogRecordCount(),
snapshot.totalRecordCount(),
snapshot.changelogRecordCount(),
snapshot.watermark(),
snapshot.statistics());
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true);
}
commitChangelog(changelog);
} else {
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, true);
commitChangelog(new Changelog(snapshot));
}
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ public void cleanLargerThan(Snapshot retainedSnapshot) {
}

for (Changelog changelog : cleanedChangelogs) {
if (changelog.deltaManifestList() != null) {
snapshotDeletion.cleanUnusedManifestList(
changelog.deltaManifestList(), new HashSet<>());
}
if (changelog.changelogManifestList() != null) {
snapshotDeletion.cleanUnusedManifestList(
changelog.changelogManifestList(), new HashSet<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ public void testScanFromChangelog() throws Exception {
Options options = new Options();
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2);
options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1);
options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20);
options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MIN, 10);
options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
FileStoreTable table =
createFileStoreTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,22 +145,23 @@ private Snapshot createSnapshotWithMillis(long id, long millis) {

private Changelog createChangelogWithMillis(long id, long millis) {
return new Changelog(
id,
0L,
null,
null,
null,
null,
null,
0L,
Snapshot.CommitKind.APPEND,
millis,
null,
null,
null,
null,
null,
null);
new Snapshot(
id,
0L,
null,
null,
null,
null,
null,
0L,
Snapshot.CommitKind.APPEND,
millis,
null,
null,
null,
null,
null,
null));
}

@Test
Expand Down

0 comments on commit 84d6c55

Please sign in to comment.