Skip to content

Commit

Permalink
[core] Introduce ExpireChangelogImpl to decouple the changelog lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 28, 2024
1 parent eef403b commit e750a4c
Show file tree
Hide file tree
Showing 18 changed files with 966 additions and 86 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@
<td>Boolean</td>
<td>Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.max</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>Integer</td>
<td>The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.</td>
</tr>
<tr>
<td><h5>changelog.num-retained.min</h5></td>
<td style="word-wrap: break-word;">10</td>
<td>Integer</td>
<td>The minimum number of completed changelog to retain. Should be greater than or equal to 1.</td>
</tr>
<tr>
<td><h5>changelog.time-retained</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>The maximum time of completed changelog to retain.</td>
</tr>
<tr>
<td><h5>commit.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
41 changes: 41 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,27 @@ public class CoreOptions implements Serializable {
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed snapshots to retain.");

public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MIN =
key("changelog.num-retained.min")
.intType()
.defaultValue(10)
.withDescription(
"The minimum number of completed changelog to retain. Should be greater than or equal to 1.");

@Documentation.OverrideDefault("infinite")
public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MAX =
key("changelog.num-retained.max")
.intType()
.defaultValue(Integer.MAX_VALUE)
.withDescription(
"The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.");

public static final ConfigOption<Duration> CHANGELOG_TIME_RETAINED =
key("changelog.time-retained")
.durationType()
.defaultValue(Duration.ofHours(1))
.withDescription("The maximum time of completed changelog to retain.");

public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE =
key("snapshot.expire.execution-mode")
.enumType(ExpireExecutionMode.class)
Expand Down Expand Up @@ -1235,6 +1256,26 @@ public Duration snapshotTimeRetain() {
return options.get(SNAPSHOT_TIME_RETAINED);
}

public int changelogNumRetainMin() {
return options.get(CHANGELOG_NUM_RETAINED_MIN);
}

public int changelogNumRetainMax() {
return options.get(CHANGELOG_NUM_RETAINED_MAX);
}

public Duration changelogTimeRetain() {
return options.get(CHANGELOG_TIME_RETAINED);
}

public boolean changelogLifecycleDecoupled() {
return changelogNumRetainMax() > snapshotNumRetainMax()
|| options.get(CHANGELOG_TIME_RETAINED)
.compareTo(options.get(SNAPSHOT_TIME_RETAINED))
> 0
|| changelogNumRetainMin() > snapshotNumRetainMin();
}

public ExpireExecutionMode snapshotExpireExecutionMode() {
return options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
}
Expand Down
253 changes: 253 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.JsonSerdeUtil;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* The metadata of changelog. It generates from the snapshot file during expiration. So that the
* changelog of the table can outlive the snapshot's lifecycle. A table's changelog can come from
* two source:
* <li>The changelog file. Eg: from the changelog-producer = 'input'
* <li>The delta files in the APPEND commits when the changelog-producer = 'none'
*/
public class Changelog {

private static final int CURRENT_VERSION = 1;

private static final String FIELD_VERSION = "version";
private static final String FIELD_ID = "id";
private static final String FIELD_SCHEMA_ID = "schemaId";
private static final String FIELD_DELTA_MANIFEST_LIST = "deltaManifestList";
private static final String FIELD_CHANGELOG_MANIFEST_LIST = "changelogManifestList";
private static final String FIELD_COMMIT_KIND = "commitKind";
private static final String FIELD_TIME_MILLIS = "timeMillis";
private static final String FIELD_RECORD_COUNT = "recordCount";
private static final String FIELD_WATERMARK = "watermark";

@JsonProperty(FIELD_VERSION)
private final int version;

@JsonProperty(FIELD_ID)
private final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;

@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;

@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;

@JsonProperty(FIELD_RECORD_COUNT)
@Nullable
private final Long recordCount;

@JsonProperty(FIELD_WATERMARK)
@Nullable
private final Long watermark;

@JsonProperty(FIELD_COMMIT_KIND)
private Snapshot.CommitKind commitKind;

public Changelog(
long id,
long schemaId,
@Nullable String deltaManifestList,
@Nullable String changelogManifestList,
Snapshot.CommitKind commitKind,
long timeMillis,
Long recordCount,
@Nullable Long watermark) {
this(
CURRENT_VERSION,
id,
schemaId,
deltaManifestList,
changelogManifestList,
commitKind,
timeMillis,
recordCount,
watermark);
}

@JsonCreator
public Changelog(
@JsonProperty(FIELD_VERSION) @Nullable Integer version,
@JsonProperty(FIELD_ID) long id,
@JsonProperty(FIELD_SCHEMA_ID) long schemaId,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) @Nullable String deltaManifestList,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
@JsonProperty(FIELD_COMMIT_KIND) Snapshot.CommitKind commitKind,
@JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
@JsonProperty(FIELD_RECORD_COUNT) @Nullable Long recordCount,
@JsonProperty(FIELD_WATERMARK) @Nullable Long watermark) {
this.version = version;
this.id = id;
this.schemaId = schemaId;
this.deltaManifestList = deltaManifestList;
this.changelogManifestList = changelogManifestList;
this.recordCount = recordCount;
this.commitKind = commitKind;
this.timeMillis = timeMillis;
this.watermark = watermark;
}

@JsonGetter(FIELD_VERSION)
public int version() {
return version;
}

@JsonGetter(FIELD_ID)
public long id() {
return id;
}

@JsonGetter(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
public String changelogManifestList() {
return changelogManifestList;
}

@JsonGetter(FIELD_DELTA_MANIFEST_LIST)
public String deltaManifestList() {
return deltaManifestList;
}

@JsonGetter(FIELD_SCHEMA_ID)
public long schemaId() {
return schemaId;
}

@JsonGetter(FIELD_COMMIT_KIND)
public Snapshot.CommitKind commitKind() {
return commitKind;
}

@JsonGetter(FIELD_TIME_MILLIS)
public long timeMillis() {
return timeMillis;
}

@JsonGetter(FIELD_RECORD_COUNT)
public Long recordCount() {
return recordCount;
}

@JsonGetter(FIELD_WATERMARK)
@Nullable
public Long watermark() {
return watermark;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Changelog)) {
return false;
}
Changelog changelog = (Changelog) o;
return version == changelog.version
&& id == changelog.id
&& schemaId == changelog.schemaId
&& timeMillis == changelog.timeMillis
&& Objects.equals(deltaManifestList, changelog.deltaManifestList)
&& Objects.equals(changelogManifestList, changelog.changelogManifestList)
&& Objects.equals(recordCount, changelog.recordCount)
&& Objects.equals(watermark, changelog.watermark)
&& commitKind == changelog.commitKind;
}

@Override
public int hashCode() {
return Objects.hash(
version,
id,
schemaId,
deltaManifestList,
changelogManifestList,
timeMillis,
recordCount,
watermark,
commitKind);
}

/**
* Return a {@link ManifestFileMeta} for each delta manifest in this changelog.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> deltaManifests(ManifestList manifestList) {
return deltaManifestList == null
? Collections.emptyList()
: manifestList.read(deltaManifestList);
}

/**
* Return a {@link ManifestFileMeta} for each changelog manifest in this changelog.
*
* @param manifestList a {@link ManifestList} instance used for reading files at snapshot.
* @return a list of ManifestFileMeta.
*/
public List<ManifestFileMeta> changelogManifests(ManifestList manifestList) {
return changelogManifestList == null
? Collections.emptyList()
: manifestList.read(changelogManifestList);
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}

public static Changelog fromJson(String json) {
return JsonSerdeUtil.fromJson(json, Changelog.class);
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}
}
Loading

0 comments on commit e750a4c

Please sign in to comment.