Skip to content

Commit

Permalink
[core] Introduce ExpireChangelogImpl to decouple the changelog lifecy…
Browse files Browse the repository at this point in the history
…cle (#3110)
  • Loading branch information
Aitozi authored Apr 1, 2024
1 parent 85f1cfd commit 46609f2
Show file tree
Hide file tree
Showing 29 changed files with 1,152 additions and 147 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.min</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The minimum number of completed changelog to retain. Should be greater than or equal to 1.</td>
</tr>
<tr>
<td><h5>changelog.time-retained</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The maximum time of completed changelog to retain.</td>
</tr>
<tr>
<td><h5>commit.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
41 changes: 41 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,27 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed snapshots to retain.");

public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MIN =
key("changelog.num-retained.min")
.intType()
.defaultValue(10)
.withDescription(
"The minimum number of completed changelog to retain. Should be greater than or equal to 1.");

@Documentation.OverrideDefault("infinite")
public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MAX =
key("changelog.num-retained.max")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription(
"The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.");

public static final ConfigOption<Duration> CHANGELOG_TIME_RETAINED =
key("changelog.time-retained")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed changelog to retain.");

public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE =
key("snapshot.expire.execution-mode")
.enumType(ExpireExecutionMode.class)
Expand Down Expand Up @@ -1214,6 +1235,26 @@ public Duration snapshotTimeRetain() {
return options.get(SNAPSHOT_TIME_RETAINED);
}

public int changelogNumRetainMin() {
return options.get(CHANGELOG_NUM_RETAINED_MIN);
}

public int changelogNumRetainMax() {
return options.get(CHANGELOG_NUM_RETAINED_MAX);
}

public Duration changelogTimeRetain() {
return options.get(CHANGELOG_TIME_RETAINED);
}

public boolean changelogLifecycleDecoupled() {
return changelogNumRetainMax() > snapshotNumRetainMax()
|| options.get(CHANGELOG_TIME_RETAINED)
.compareTo(options.get(SNAPSHOT_TIME_RETAINED))
> 0
|| changelogNumRetainMin() > snapshotNumRetainMin();
}

public ExpireExecutionMode snapshotExpireExecutionMode() {
return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
}
Expand Down
131 changes: 131 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Map;

/**
* The metadata of changelog. It generates from the snapshot file during expiration. So that the
* changelog of the table can outlive the snapshot's lifecycle. A table's changelog can come from
* one source:
* <li>The changelog file. Eg: from the changelog-producer = 'input'
*/
public class Changelog extends Snapshot {

private static final int CURRENT_VERSION = Snapshot.CURRENT_VERSION;

public Changelog(
long id,
long schemaId,
String baseManifestList,
String deltaManifestList,
@Nullable String changelogManifestList,
@Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
long timeMillis,
Map<Integer, Long> logOffsets,
@Nullable Long totalRecordCount,
@Nullable Long deltaRecordCount,
@Nullable Long changelogRecordCount,
@Nullable Long watermark,
@Nullable String statistics) {
this(
CURRENT_VERSION,
id,
schemaId,
baseManifestList,
deltaManifestList,
changelogManifestList,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
}

@JsonCreator
public Changelog(
@JsonProperty(FIELD_VERSION) @Nullable Integer version,
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
@JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets,
@JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount,
@JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount,
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark,
@JsonProperty(FIELD_STATISTICS) @Nullable String statistics) {
super(
version,
id,
schemaId,
baseManifestList,
deltaManifestList,
changelogManifestList,
indexManifest,
commitUser,
commitIdentifier,
commitKind,
timeMillis,
logOffsets,
totalRecordCount,
deltaRecordCount,
changelogRecordCount,
watermark,
statistics);
}

public static Changelog fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Changelog.class);
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}
}
38 changes: 19 additions & 19 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,25 @@ public class Snapshot {
public static final long FIRST_SNAPSHOT_ID = 1;

public static final int TABLE_STORE_02_VERSION = 1;
private static final int CURRENT_VERSION = 3;

private static final String FIELD_VERSION = "version";
private static final String FIELD_ID = "id";
private static final String FIELD_SCHEMA_ID = "schemaId";
private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
private static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
private static final String FIELD_INDEX_MANIFEST = "indexManifest";
private static final String FIELD_COMMIT_USER = "commitUser";
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
private static final String FIELD_LOG_OFFSETS = "logOffsets";
private static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount";
private static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
private static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
private static final String FIELD_WATERMARK = "watermark";
private static final String FIELD_STATISTICS = "statistics";
protected static final int CURRENT_VERSION = 3;

protected static final String FIELD_VERSION = "version";
protected static final String FIELD_ID = "id";
protected static final String FIELD_SCHEMA_ID = "schemaId";
protected static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
protected static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
protected static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
protected static final String FIELD_INDEX_MANIFEST = "indexManifest";
protected static final String FIELD_COMMIT_USER = "commitUser";
protected static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
protected static final String FIELD_COMMIT_KIND = "commitKind";
protected static final String FIELD_TIME_MILLIS = "timeMillis";
protected static final String FIELD_LOG_OFFSETS = "logOffsets";
protected static final String FIELD_TOTAL_RECORD_COUNT = "totalRecordCount";
protected static final String FIELD_DELTA_RECORD_COUNT = "deltaRecordCount";
protected static final String FIELD_CHANGELOG_RECORD_COUNT = "changelogRecordCount";
protected static final String FIELD_WATERMARK = "watermark";
protected static final String FIELD_STATISTICS = "statistics";

// version of snapshot
// null for paimon <= 0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,40 +151,14 @@ protected void recordDeletionBuckets(ManifestEntry entry) {
.add(entry.bucket());
}

protected void cleanUnusedManifests(
Snapshot snapshot, Set<String> skippingSet, boolean deleteChangelog) {
// clean base and delta manifests
List<String> toDeleteManifests = new ArrayList<>();
List<ManifestFileMeta> toExpireManifests = new ArrayList<>();
toExpireManifests.addAll(tryReadManifestList(snapshot.baseManifestList()));
toExpireManifests.addAll(tryReadManifestList(snapshot.deltaManifestList()));
for (ManifestFileMeta manifest : toExpireManifests) {
String fileName = manifest.fileName();
if (!skippingSet.contains(fileName)) {
toDeleteManifests.add(fileName);
// to avoid other snapshots trying to delete again
skippingSet.add(fileName);
}
}
deleteFiles(toDeleteManifests, manifestFile::delete);

toDeleteManifests.clear();
if (!skippingSet.contains(snapshot.baseManifestList())) {
toDeleteManifests.add(snapshot.baseManifestList());
}
if (!skippingSet.contains(snapshot.deltaManifestList())) {
toDeleteManifests.add(snapshot.deltaManifestList());
}
deleteFiles(toDeleteManifests, manifestList::delete);

// clean changelog manifests
if (deleteChangelog && snapshot.changelogManifestList() != null) {
deleteFiles(
tryReadManifestList(snapshot.changelogManifestList()),
manifest -> manifestFile.delete(manifest.fileName()));
manifestList.delete(snapshot.changelogManifestList());
public void cleanUnusedStatisticsManifests(Snapshot snapshot, Set<String> skippingSet) {
// clean statistics
if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) {
statsFileHandler.deleteStats(snapshot.statistics());
}
}

public void cleanUnusedIndexManifests(Snapshot snapshot, Set<String> skippingSet) {
// clean index manifests
String indexManifest = snapshot.indexManifest();
// check exists, it may have been deleted by other snapshots
Expand All @@ -199,11 +173,35 @@ protected void cleanUnusedManifests(
indexFileHandler.deleteManifest(indexManifest);
}
}
}

// clean statistics
if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) {
statsFileHandler.deleteStats(snapshot.statistics());
public void cleanUnusedManifestList(String manifestName, Set<String> skippingSet) {
List<String> toDeleteManifests = new ArrayList<>();
List<ManifestFileMeta> toExpireManifests = tryReadManifestList(manifestName);
for (ManifestFileMeta manifest : toExpireManifests) {
String fileName = manifest.fileName();
if (!skippingSet.contains(fileName)) {
toDeleteManifests.add(fileName);
// to avoid other snapshots trying to delete again
skippingSet.add(fileName);
}
}
if (!skippingSet.contains(manifestName)) {
toDeleteManifests.add(manifestName);
}

deleteFiles(toDeleteManifests, manifestFile::delete);
}

public void cleanUnusedManifests(
Snapshot snapshot, Set<String> skippingSet, boolean deleteChangelog) {
cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet);
cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet);
if (deleteChangelog) {
cleanUnusedManifestList(snapshot.changelogManifestList(), skippingSet);
}
cleanUnusedIndexManifests(snapshot, skippingSet);
cleanUnusedStatisticsManifests(snapshot, skippingSet);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ public SnapshotDeletion(

@Override
public void cleanUnusedDataFiles(Snapshot snapshot, Predicate<ManifestEntry> skipper) {
cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper);
}

public void cleanUnusedDataFiles(String manifestList, Predicate<ManifestEntry> skipper) {
// try read manifests
List<String> manifestFileNames =
readManifestFileNames(tryReadManifestList(snapshot.deltaManifestList()));
List<String> manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList));
List<ManifestEntry> manifestEntries;
// data file path -> (original manifest entry, extra file paths)
Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
Expand Down Expand Up @@ -121,6 +123,15 @@ public static void validateTableSchema(TableSchema schema) {
+ " should not be larger than "
+ SNAPSHOT_NUM_RETAINED_MAX.key());

checkArgument(
options.changelogNumRetainMin() > 0,
CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1");
checkArgument(
options.changelogNumRetainMin() <= options.changelogNumRetainMax(),
CHANGELOG_NUM_RETAINED_MIN.key()
+ " should not be larger than "
+ CHANGELOG_NUM_RETAINED_MAX.key());

// Get the format type here which will try to convert string value to {@Code
// FileFormatType}. If the string value is illegal, an exception will be thrown.
CoreOptions.FileFormatType fileFormatType = options.formatType();
Expand Down
Loading

0 comments on commit 46609f2

Please sign in to comment.