Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce TTL for tag #3159

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/maintenance/manage-tags.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ You can create a tag with given name and snapshot ID.
--table <table-name> \
--tag_name <tag-name> \
[--snapshot <snapshot_id>] \
[--time_retained <time-retained>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
```

Expand All @@ -126,6 +127,7 @@ public class CreateTag {
public static void main(String[] args) {
Table table = ...;
table.createTag("my-tag", 1);
table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12));
}
}
```
Expand All @@ -138,6 +140,11 @@ Run the following sql:
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2);
```

To create a tag with retained 1 day, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d');
```

To create a tag based on the latest snapshot id, run the following sql:
```sql
CALL create_tag(table => 'test.t', tag => 'test_tag');
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,12 @@
<td><p>Enum</p></td>
<td>What frequency is used to generate tags.<br /><br />Possible values:<ul><li>"daily": Generate a tag every day.</li><li>"hourly": Generate a tag every hour.</li><li>"two-hours": Generate a tag every two hours.</li></ul></td>
</tr>
<tr>
<td><h5>tag.default-time-retained</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>The default maximum time retained for newly created tags.</td>
</tr>
<tr>
<td><h5>tag.num-retained-max</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The maximum number of tags to retain.");

public static final ConfigOption<Duration> TAG_DEFAULT_TIME_RETAINED =
key("tag.default-time-retained")
.durationType()
.noDefaultValue()
.withDescription("The default maximum time retained for newly created tags.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -1643,10 +1649,15 @@ public TagPeriodFormatter tagPeriodFormatter() {
return options.get(TAG_PERIOD_FORMATTER);
}

@Nullable
public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
}

public Duration tagDefaultTimeRetained() {
return options.get(TAG_DEFAULT_TIME_RETAINED);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
Expand Down Expand Up @@ -248,9 +248,8 @@ public PartitionExpire newPartitionExpire(String commitUser) {
}

@Override
@Nullable
public TagAutoCreation newTagCreationManager() {
return TagAutoCreation.create(
public TagAutoManager newTagCreationManager() {
return TagAutoManager.create(
options,
snapshotManager(),
newTagManager(),
Expand Down
5 changes: 2 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -92,8 +92,7 @@ public interface FileStore<T> extends Serializable {
@Nullable
PartitionExpire newPartitionExpire(String commitUser);

@Nullable
TagAutoCreation newTagCreationManager();
TagAutoManager newTagCreationManager();

ServiceManager newServiceManager();

Expand Down
42 changes: 21 additions & 21 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* This file is the entrance to all data committed at some specific time point.
Expand Down Expand Up @@ -95,37 +94,37 @@ public class Snapshot {
// null for paimon <= 0.2
@JsonProperty(FIELD_VERSION)
@Nullable
private final Integer version;
protected final Integer version;

@JsonProperty(FIELD_ID)
private final long id;
protected final long id;

@JsonProperty(FIELD_SCHEMA_ID)
private final long schemaId;
protected final long schemaId;

// a manifest list recording all changes from the previous snapshots
@JsonProperty(FIELD_BASE_MANIFEST_LIST)
private final String baseManifestList;
protected final String baseManifestList;

// a manifest list recording all new changes occurred in this snapshot
// for faster expire and streaming reads
@JsonProperty(FIELD_DELTA_MANIFEST_LIST)
private final String deltaManifestList;
protected final String deltaManifestList;

// a manifest list recording all changelog produced in this snapshot
// null if no changelog is produced, or for paimon <= 0.2
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST)
@Nullable
private final String changelogManifestList;
protected final String changelogManifestList;

// a manifest recording all index files of this table
// null if no index file
@JsonProperty(FIELD_INDEX_MANIFEST)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final String indexManifest;
protected final String indexManifest;

@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
protected final String commitUser;

// Mainly for snapshot deduplication.
//
Expand All @@ -135,37 +134,37 @@ public class Snapshot {
// If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
// committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
@JsonProperty(FIELD_COMMIT_IDENTIFIER)
private final long commitIdentifier;
protected final long commitIdentifier;

@JsonProperty(FIELD_COMMIT_KIND)
private final CommitKind commitKind;
protected final CommitKind commitKind;

@JsonProperty(FIELD_TIME_MILLIS)
private final long timeMillis;
protected final long timeMillis;

@JsonProperty(FIELD_LOG_OFFSETS)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Map<Integer, Long> logOffsets;
protected final Map<Integer, Long> logOffsets;

// record count of all changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_TOTAL_RECORD_COUNT)
@Nullable
private final Long totalRecordCount;
protected final Long totalRecordCount;

// record count of all new changes occurred in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_DELTA_RECORD_COUNT)
@Nullable
private final Long deltaRecordCount;
protected final Long deltaRecordCount;

// record count of all changelog produced in this snapshot
// null for paimon <= 0.3
@JsonProperty(FIELD_CHANGELOG_RECORD_COUNT)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Long changelogRecordCount;
protected final Long changelogRecordCount;

// watermark for input records
// null for paimon <= 0.3
Expand All @@ -174,14 +173,14 @@ public class Snapshot {
@JsonProperty(FIELD_WATERMARK)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Long watermark;
protected final Long watermark;

// stats file name for statistics of this table
// null if no stats file
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty(FIELD_STATISTICS)
@Nullable
private final String statistics;
protected final String statistics;

public Snapshot(
long id,
Expand Down Expand Up @@ -447,12 +446,13 @@ public static Snapshot fromPath(FileIO fileIO, Path path) {
}
}

public static Optional<Snapshot> safelyFromPath(FileIO fileIO, Path path) throws IOException {
@Nullable
public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
String json = fileIO.readFileUtf8(path);
return Optional.of(Snapshot.fromJson(json));
return Snapshot.fromJson(json);
} catch (FileNotFoundException e) {
return Optional.empty();
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,7 +1231,7 @@ static ConflictCheck noConflictCheck() {
return latestSnapshot -> false;
}

static ConflictCheck mustConflictCheck() {
public static ConflictCheck mustConflictCheck() {
return latestSnapshot -> true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -461,8 +462,7 @@ public void rollbackTo(long snapshotId) {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
public Snapshot createTagInternal(long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
Snapshot snapshot = null;
if (snapshotManager.snapshotExists(fromSnapshotId)) {
Expand All @@ -482,18 +482,36 @@ public void createTag(String tagName, long fromSnapshotId) {
snapshot != null,
"Cannot create tag because given snapshot #%s doesn't exist.",
fromSnapshotId);
createTag(tagName, snapshot);
return snapshot;
}

@Override
public void createTag(String tagName, long fromSnapshotId) {
createTag(
tagName, createTagInternal(fromSnapshotId), coreOptions().tagDefaultTimeRetained());
}

@Override
public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
createTag(tagName, createTagInternal(fromSnapshotId), timeRetained);
}

@Override
public void createTag(String tagName) {
wwj6591812 marked this conversation as resolved.
Show resolved Hide resolved
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, latestSnapshot);
createTag(tagName, latestSnapshot, coreOptions().tagDefaultTimeRetained());
}

@Override
public void createTag(String tagName, Duration timeRetained) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist.");
createTag(tagName, latestSnapshot, timeRetained);
}

private void createTag(String tagName, Snapshot fromSnapshot) {
tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks());
private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.InnerStreamTableScan;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,6 +110,14 @@ default void createTag(String tagName, long fromSnapshotId) {
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName) {
throw new UnsupportedOperationException(
Expand All @@ -117,6 +126,14 @@ default void createTag(String tagName) {
this.getClass().getSimpleName()));
}

@Override
default void createTag(String tagName, Duration timeRetained) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support createTag.",
this.getClass().getSimpleName()));
}

@Override
default void deleteTag(String tagName) {
throw new UnsupportedOperationException(
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.types.RowType;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -76,10 +77,16 @@ public interface Table extends Serializable {
@Experimental
void createTag(String tagName, long fromSnapshotId);

@Experimental
void createTag(String tagName, long fromSnapshotId, Duration timeRetained);

/** Create a tag from latest snapshot. */
@Experimental
void createTag(String tagName);

@Experimental
void createTag(String tagName, Duration timeRetained);

/** Delete a tag by name. */
@Experimental
void deleteTag(String tagName);
Expand Down
Loading
Loading