Skip to content

Commit

Permalink
[core] Introduce ExpireChangelogImpl to decouple the changelog lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 5, 2024
1 parent eef403b commit b020e1f
Show file tree
Hide file tree
Showing 15 changed files with 538 additions and 53 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>changelog.time-retained</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The minimum time of completed changelog to retain.</td>
</tr>
<tr>
<td><h5>commit.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed snapshots to retain.");

public static final ConfigOption<Duration> CHANGELOG_TIME_RETAINED =
key("changelog.time-retained")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription("The minimum time of completed changelog to retain.");

public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE =
key("snapshot.expire.execution-mode")
.enumType(ExpireExecutionMode.class)
Expand Down Expand Up @@ -1235,6 +1241,10 @@ public Duration snapshotTimeRetain() {
return options.get(SNAPSHOT_TIME_RETAINED);
}

public Duration changelogTimeRetain() {
return options.get(CHANGELOG_TIME_RETAINED);
}

public ExpireExecutionMode snapshotExpireExecutionMode() {
return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,17 @@ protected void recordDeletionBuckets(ManifestEntry entry) {
.add(entry.bucket());
}

protected void cleanUnusedManifests(
public void cleanUnusedChangelogManifests(Snapshot snapshot) {
// clean changelog manifests
if (snapshot.changelogManifestList() != null) {
deleteFiles(
tryReadManifestList(snapshot.changelogManifestList()),
manifest -> manifestFile.delete(manifest.fileName()));
manifestList.delete(snapshot.changelogManifestList());
}
}

public void cleanUnusedManifests(
Snapshot snapshot, Set<String> skippingSet, boolean deleteChangelog) {
// clean base and delta manifests
List<String> toDeleteManifests = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,15 @@ public ExpireSnapshots newExpireSnapshots() {
coreOptions().snapshotExpireCleanEmptyDirectories());
}

@Override
public ExpireSnapshots newExpireChangelog() {
return new ExpireChangelogImpl(
snapshotManager(),
store().newSnapshotDeletion(),
coreOptions().snapshotExpireCleanEmptyDirectories(),
false);
}

@Override
public TableCommitImpl newCommit(String commitUser) {
// Compatibility with previous design, the main branch is written by default
Expand All @@ -301,17 +310,21 @@ public TableCommitImpl newCommit(String commitUser, String branchName) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
if (!options.writeOnly()) {
ExpireSnapshots expireChangelog =
newExpireChangelog().maxDeletes(options.snapshotExpireLimit());
ExpireSnapshots expireSnapshots =
newExpireSnapshots()
.retainMax(options.snapshotNumRetainMax())
.retainMin(options.snapshotNumRetainMin())
.maxDeletes(options.snapshotExpireLimit());
long snapshotTimeRetain = options.snapshotTimeRetain().toMillis();
long changelogTimeRetain = options.changelogTimeRetain().toMillis();
snapshotExpire =
() ->
expireSnapshots
.olderThanMills(System.currentTimeMillis() - snapshotTimeRetain)
.expire();
() -> {
long current = System.currentTimeMillis();
expireSnapshots.olderThanMills(current - snapshotTimeRetain).expire();
expireChangelog.olderThanMills(current - changelogTimeRetain).expire();
};
}

return new TableCommitImpl(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;

/** Cleanup the changelog in changelog directory. */
public class ExpireChangelogImpl implements ExpireSnapshots {

public static final Logger LOG = LoggerFactory.getLogger(ExpireChangelogImpl.class);

private final SnapshotManager snapshotManager;
private final SnapshotDeletion snapshotDeletion;
private final boolean cleanEmptyDirectories;
/** Whether to keep the last snapshot. */
private final boolean keepLastOne;

private long olderThanMills = 0;
private int maxDeletes = Integer.MAX_VALUE;

public ExpireChangelogImpl(
SnapshotManager snapshotManager,
SnapshotDeletion snapshotDeletion,
boolean cleanEmptyDirectories,
boolean keepLastOne) {
this.snapshotManager = snapshotManager;
this.snapshotDeletion = snapshotDeletion;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.keepLastOne = keepLastOne;
}

@Override
public ExpireChangelogImpl retainMax(int retainMax) {
throw new UnsupportedOperationException();
}

@Override
public ExpireChangelogImpl retainMin(int retainMin) {
throw new UnsupportedOperationException();
}

@Override
public ExpireChangelogImpl olderThanMills(long olderThanMills) {
this.olderThanMills = olderThanMills;
return this;
}

@Override
public ExpireChangelogImpl maxDeletes(int maxDeletes) {
this.maxDeletes = maxDeletes;
return this;
}

@Override
public int expire() {
Long latest = snapshotManager.latestLongLivedChangelogId();
if (latest == null) {
return 0;
}
Long earliest = snapshotManager.earliestLongLivedChangelogId();
if (earliest == null) {
return 0;
}
long maxId = Math.min(maxDeletes + earliest - 1, latest);
for (long id = earliest; id <= maxId; id++) {
if (snapshotManager.longLivedChangelogExists(id)
&& olderThanMills <= snapshotManager.longLivedChangelog(id).timeMillis()) {
return expireUntil(earliest, keepLastOne ? id : id + 1);
}
}
return expireUntil(earliest, keepLastOne ? maxId : maxId + 1);
}

public int expireUntil(long earliestId, long endExclusiveId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")");
}

for (long id = earliestId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete changelog files from snapshot #" + id);
}
Snapshot snapshot = snapshotManager.longLivedChangelog(id);
// delete changelog files
if (snapshot.changelogManifestList() != null) {
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}
snapshotDeletion.cleanUnusedChangelogManifests(snapshot);
snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id));
}

if (cleanEmptyDirectories) {
snapshotDeletion.cleanDataDirectories();
}
writeEarliestHintFile(endExclusiveId);
return (int) (endExclusiveId - earliestId);
}

private void writeEarliestHintFile(long earliest) {
try {
snapshotManager.commitLongLivedChangelogEarliestHint(earliest);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@VisibleForTesting
public long getOlderThanMills() {
return olderThanMills;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,17 +186,6 @@ public int expireUntil(long earliestId, long endExclusiveId) {
snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
}

// delete changelog files
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete changelog files from snapshot #" + id);
}
Snapshot snapshot = snapshotManager.snapshot(id);
if (snapshot.changelogManifestList() != null) {
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}
}

// data files and changelog files in bucket directories has been deleted
// then delete changed bucket directories if they are empty
if (cleanEmptyDirectories) {
Expand All @@ -215,10 +204,15 @@ public int expireUntil(long earliestId, long endExclusiveId) {
}

Snapshot snapshot = snapshotManager.snapshot(id);
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, false);

// delete snapshot last
snapshotManager.fileIO().deleteQuietly(snapshotManager.snapshotPath(id));
// move snapshot metadata to changelog
try {
snapshotManager.moveToChangelog(id);
snapshotManager.commitLongLivedChangelogLatestHint(id);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

writeEarliestHint(endExclusiveId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,12 @@ default ExpireSnapshots newExpireSnapshots() {
"Readonly Table %s does not support expireSnapshots.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireChangelog() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support expireChangelog.",
this.getClass().getSimpleName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public RollbackHelper(
public void cleanLargerThan(Snapshot retainedSnapshot) {
// clean data files
List<Snapshot> cleanedSnapshots = cleanSnapshotsDataFiles(retainedSnapshot);
List<Snapshot> cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot);
List<Snapshot> cleanedTags = cleanTagsDataFiles(retainedSnapshot);

// clean manifests
Expand All @@ -78,6 +79,10 @@ public void cleanLargerThan(Snapshot retainedSnapshot) {
snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
}

for (Snapshot snapshot : cleanedChangelogs) {
snapshotDeletion.cleanUnusedChangelogManifests(snapshot);
}

cleanedTags.removeAll(cleanedSnapshots);
for (Snapshot snapshot : cleanedTags) {
tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet);
Expand Down Expand Up @@ -121,6 +126,33 @@ private List<Snapshot> cleanSnapshotsDataFiles(Snapshot retainedSnapshot) {
return toBeCleaned;
}

private List<Snapshot> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapshot) {
Long earliest = snapshotManager.earliestLongLivedChangelogId();
Long latest = snapshotManager.latestLongLivedChangelogId();
if (earliest == null || latest == null) {
return Collections.emptyList();
}

// delete snapshot files first, cannot be read now
// it is possible that some snapshots have been expired
List<Snapshot> toBeCleaned = new ArrayList<>();
long to = Math.max(earliest, retainedSnapshot.id() + 1);
for (long i = latest; i >= to; i--) {
toBeCleaned.add(snapshotManager.changelog(i));
fileIO.deleteQuietly(snapshotManager.longLivedChangelogPath(i));
}

// delete data files of changelog
for (Snapshot snapshot : toBeCleaned) {
snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList());
}

// delete directories
snapshotDeletion.cleanDataDirectories();

return toBeCleaned;
}

private List<Snapshot> cleanTagsDataFiles(Snapshot retainedSnapshot) {
SortedMap<Snapshot, List<String>> tags = tagManager.tags();
if (tags.isEmpty()) {
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public interface Table extends Serializable {
@Experimental
ExpireSnapshots newExpireSnapshots();

@Experimental
ExpireSnapshots newExpireChangelog();

// =============== Read & Write Operations ==================

/** Returns a new read builder. */
Expand Down
Loading

0 comments on commit b020e1f

Please sign in to comment.