diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index bc1bd45e27f80..b74e738222ad3 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID. --database \ --table \ --tag_name \ + --time_retained \ [--snapshot ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -126,6 +127,7 @@ public class CreateTag { public static void main(String[] args) { Table table = ...; table.createTag("my-tag", 1); + table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1); } } ``` @@ -138,6 +140,11 @@ Run the following sql: CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2); ``` +To create a tag with retained 12 hours, run the following sql: +```sql +CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2); +``` + To create a tag based on the latest snapshot id, run the following sql: ```sql CALL create_tag(table => 'test.t', tag => 'test_tag'); diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5959b6b4f0961..e4f4f6a4b3486 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -666,6 +666,12 @@ Integer The maximum number of tags to retain. + +
tag.time-retained
+ (none) + Duration + The maximum retained time for all tags. +
tag.period-formatter
with_dashes diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index cbe5f45c5b508..f5e63c35e3e3a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -70,6 +70,8 @@ public class CoreOptions implements Serializable { public static final String DISTINCT = "distinct"; + public static final Duration DEFAULT_TAG_TIME_RETAINED = Duration.ofDays(3650000); + public static final ConfigOption BUCKET = key("bucket") .intType() @@ -989,6 +991,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The maximum number of tags to retain."); + public static final ConfigOption TAG_TIME_RETAINED = + key("tag.time-retained") + .durationType() + .defaultValue(DEFAULT_TAG_TIME_RETAINED) + .withDescription("The maximum retained time for all tags."); + public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") .durationType() @@ -1617,6 +1625,10 @@ public Integer tagNumRetainedMax() { return options.get(TAG_NUM_RETAINED_MAX); } + public Duration tagTimeRetained() { + return options.get(TAG_TIME_RETAINED); + } + public Duration snapshotWatermarkIdleTimeout() { return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 33ea8f4bb7cab..6221752f01afe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -93,37 +93,37 @@ public class Snapshot { // null for paimon <= 0.2 @JsonProperty(FIELD_VERSION) @Nullable - private final Integer version; + protected final Integer version; @JsonProperty(FIELD_ID) - private final long id; + protected final long id; @JsonProperty(FIELD_SCHEMA_ID) - private final long schemaId; + protected final long schemaId; // a manifest list recording all changes from the previous snapshots @JsonProperty(FIELD_BASE_MANIFEST_LIST) - private final String baseManifestList; + protected final String baseManifestList; // a manifest list recording all new changes occurred in this snapshot // for faster expire and streaming reads @JsonProperty(FIELD_DELTA_MANIFEST_LIST) - private final String deltaManifestList; + protected final String deltaManifestList; // a manifest list recording all changelog produced in this snapshot // null if no changelog is produced, or for paimon <= 0.2 @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable - private final String changelogManifestList; + protected final String changelogManifestList; // a manifest recording all index files of this table // null if no index file @JsonProperty(FIELD_INDEX_MANIFEST) @JsonInclude(JsonInclude.Include.NON_NULL) - private final String indexManifest; + protected final String indexManifest; @JsonProperty(FIELD_COMMIT_USER) - private final String commitUser; + protected final String commitUser; // Mainly for snapshot deduplication. // @@ -133,34 +133,34 @@ public class Snapshot { // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be // committed before snapshot B, and thus snapshot A must contain older records than snapshot B. @JsonProperty(FIELD_COMMIT_IDENTIFIER) - private final long commitIdentifier; + protected final long commitIdentifier; @JsonProperty(FIELD_COMMIT_KIND) - private final CommitKind commitKind; + protected final CommitKind commitKind; @JsonProperty(FIELD_TIME_MILLIS) - private final long timeMillis; + protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) - private final Map logOffsets; + protected final Map logOffsets; // record count of all changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable - private final Long totalRecordCount; + protected final Long totalRecordCount; // record count of all new changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable - private final Long deltaRecordCount; + protected final Long deltaRecordCount; // record count of all changelog produced in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable - private final Long changelogRecordCount; + protected final Long changelogRecordCount; // watermark for input records // null for paimon <= 0.3 @@ -168,14 +168,14 @@ public class Snapshot { // watermark @JsonProperty(FIELD_WATERMARK) @Nullable - private final Long watermark; + protected final Long watermark; // stats file name for statistics of this table // null if no stats file @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_STATISTICS) @Nullable - private final String statistics; + protected final String statistics; public Snapshot( long id, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 99ad605df8ef8..5b69324bf7ff1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -59,6 +59,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -443,8 +444,7 @@ public void rollbackTo(long snapshotId) { rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId)); } - @Override - public void createTag(String tagName, long fromSnapshotId) { + public Snapshot createTagInternal(long fromSnapshotId) { SnapshotManager snapshotManager = snapshotManager(); Snapshot snapshot = null; if (snapshotManager.snapshotExists(fromSnapshotId)) { @@ -464,18 +464,35 @@ public void createTag(String tagName, long fromSnapshotId) { snapshot != null, "Cannot create tag because given snapshot #%s doesn't exist.", fromSnapshotId); - createTag(tagName, snapshot); + return snapshot; + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + createTag(tagName, coreOptions().tagTimeRetained(), createTagInternal(fromSnapshotId)); + } + + @Override + public void createTag(String tagName, Duration timeRetained, long fromSnapshotId) { + createTag(tagName, timeRetained, createTagInternal(fromSnapshotId)); } @Override public void createTag(String tagName) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, latestSnapshot); + createTag(tagName, coreOptions().tagTimeRetained(), latestSnapshot); + } + + @Override + public void createTag(String tagName, Duration timeRetained) { + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); + createTag(tagName, timeRetained, latestSnapshot); } - private void createTag(String tagName, Snapshot fromSnapshot) { - tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks()); + private void createTag(String tagName, Duration timeRetained, Snapshot fromSnapshot) { + tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index d07035f2651e3..2119137ee8e2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.annotation.Experimental; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -25,6 +26,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.InnerStreamTableScan; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -109,6 +111,14 @@ default void createTag(String tagName, long fromSnapshotId) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, Duration timeRetained, long fromSnapshotId) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void createTag(String tagName) { throw new UnsupportedOperationException( @@ -117,6 +127,14 @@ default void createTag(String tagName) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3ed6a1990e5fe..ae0be3115b6f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.RowType; import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,10 +77,16 @@ public interface Table extends Serializable { @Experimental void createTag(String tagName, long fromSnapshotId); + @Experimental + void createTag(String tagName, Duration timeRetained, long fromSnapshotId); + /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); + @Experimental + void createTag(String tagName, Duration timeRetained); + /** Delete a tag by name. */ @Experimental void deleteTag(String tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index ebd8aa7af0db9..a669077fdf748 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -63,6 +64,7 @@ import java.util.SortedMap; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** A {@link Table} for showing tags of table. */ public class TagsTable implements ReadonlyTable { @@ -79,7 +81,10 @@ public class TagsTable implements ReadonlyTable { new DataField(2, "schema_id", new BigIntType(false)), new DataField(3, "commit_time", new TimestampType(false, 3)), new DataField(4, "record_count", new BigIntType(true)), - new DataField(5, "branches", SerializationUtils.newStringType(true)))); + new DataField(5, "branches", SerializationUtils.newStringType(true)), + new DataField(6, "create_time", SerializationUtils.newStringType(true)), + new DataField( + 7, "time_retained", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -234,17 +239,21 @@ public RecordReader createReader(Split split) { } private InternalRow toRow( - Map.Entry tag, Map> tagBranches) { - Snapshot snapshot = tag.getValue(); - List branches = tagBranches.get(tag.getKey()); + Map.Entry snapshot, Map> tagBranches) { + checkArgument( + snapshot.getValue() instanceof Tag, + "There is a bug, Snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) snapshot.getValue(); + List branches = tagBranches.get(snapshot.getKey()); return GenericRow.of( - BinaryString.fromString(tag.getKey()), - snapshot.id(), - snapshot.schemaId(), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), - BinaryString.fromString(branches == null ? "[]" : branches.toString())); + BinaryString.fromString(snapshot.getKey()), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(branches == null ? "[]" : branches.toString()), + tag.createTime(), + tag.timeRetained()); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java new file mode 100644 index 0000000000000..f348d0d87e5fa --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Snapshot with tagCreateTime and tagTimeRetained. */ +public class Tag extends Snapshot { + + private static final String FIELD_CREATE_TIME = "createTime"; + private static final String FIELD_TIME_RETAINED = "timeRetained"; + + @JsonProperty(FIELD_CREATE_TIME) + @Nullable + private final LocalDateTime createTime; + + @JsonProperty(FIELD_TIME_RETAINED) + @Nullable + private final Duration timeRetained; + + @JsonCreator + public Tag( + @JsonProperty(FIELD_ID) long id, + @JsonProperty(FIELD_SCHEMA_ID) long schemaId, + @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, + @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, + @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList, + @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest, + @JsonProperty(FIELD_COMMIT_USER) String commitUser, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, + @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets, + @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount, + @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, + @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, + @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_CREATE_TIME) @Nullable LocalDateTime createTime, + @JsonProperty(FIELD_TIME_RETAINED) @Nullable Duration timeRetained) { + super( + CURRENT_VERSION, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + this.createTime = createTime; + this.timeRetained = timeRetained; + } + + @JsonGetter(FIELD_CREATE_TIME) + public LocalDateTime createTime() { + return createTime; + } + + @JsonGetter(FIELD_TIME_RETAINED) + public Duration timeRetained() { + return timeRetained; + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return Tag.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read tag from path " + path, e); + } + } + + public static Optional safelyFromTagPath(FileIO fileIO, Path path) throws IOException { + try { + String json = fileIO.readFileUtf8(path); + return Optional.of(Tag.fromJson(json)); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + } + + public static Tag fromSnapshotAndTagTtl( + Snapshot snapshot, Duration timeRetained, LocalDateTime createTime) { + return new Tag( + snapshot.id(), + snapshot.schemaId(), + snapshot.baseManifestList(), + snapshot.deltaManifestList(), + snapshot.changelogManifestList(), + snapshot.indexManifest(), + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.totalRecordCount(), + snapshot.deltaRecordCount(), + snapshot.changelogRecordCount(), + snapshot.watermark(), + snapshot.statistics(), + createTime, + timeRetained); + } + + public Snapshot toSnapshot() { + return new Snapshot( + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + @Override + public int hashCode() { + return Objects.hash( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + createTime, + timeRetained); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Tag)) { + return false; + } + Tag that = (Tag) o; + return Objects.equals(version, that.version) + && id == that.id + && schemaId == that.schemaId + && Objects.equals(baseManifestList, that.baseManifestList) + && Objects.equals(deltaManifestList, that.deltaManifestList) + && Objects.equals(changelogManifestList, that.changelogManifestList) + && Objects.equals(indexManifest, that.indexManifest) + && Objects.equals(commitUser, that.commitUser) + && commitIdentifier == that.commitIdentifier + && commitKind == that.commitKind + && timeMillis == that.timeMillis + && Objects.equals(logOffsets, that.logOffsets) + && Objects.equals(totalRecordCount, that.totalRecordCount) + && Objects.equals(deltaRecordCount, that.deltaRecordCount) + && Objects.equals(changelogRecordCount, that.changelogRecordCount) + && Objects.equals(watermark, that.watermark) + && Objects.equals(createTime, that.createTime) + && Objects.equals(timeRetained, that.timeRetained); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 505454313bd20..9d9b5007db157 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -27,23 +27,32 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.time.Duration; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.SortedMap; import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID; import static org.apache.paimon.shade.guava30.com.google.common.base.MoreObjects.firstNonNull; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkState; /** A manager to create tags automatically. */ public class TagAutoCreation { + private static final Logger LOG = LoggerFactory.getLogger(TagAutoCreation.class); + private final SnapshotManager snapshotManager; private final TagManager tagManager; private final TagDeletion tagDeletion; @@ -51,6 +60,7 @@ public class TagAutoCreation { private final TagPeriodHandler periodHandler; private final Duration delay; private final Integer numRetainedMax; + private final Duration timeRetained; private final List callbacks; private final Duration idlenessTimeout; @@ -65,6 +75,7 @@ private TagAutoCreation( TagPeriodHandler periodHandler, Duration delay, Integer numRetainedMax, + Duration timeRetained, Duration idlenessTimeout, List callbacks) { this.snapshotManager = snapshotManager; @@ -74,6 +85,7 @@ private TagAutoCreation( this.periodHandler = periodHandler; this.delay = delay; this.numRetainedMax = numRetainedMax; + this.timeRetained = timeRetained; this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; @@ -118,7 +130,7 @@ public boolean forceCreatingSnapshot() { public void run() { while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { - tryToTag(snapshotManager.snapshot(nextSnapshot)); + tryToCreateTags(snapshotManager.snapshot(nextSnapshot)); nextSnapshot++; } else { // avoid snapshot has been expired @@ -130,9 +142,10 @@ public void run() { } } } + expireTags(); } - private void tryToTag(Snapshot snapshot) { + private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); if (!timeOptional.isPresent()) { @@ -144,29 +157,66 @@ private void tryToTag(Snapshot snapshot) { || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, timeRetained, callbacks); nextTag = periodHandler.nextTagTime(thisTag); + } + } + + private void expireTags() { + Set deleteTags = new HashSet<>(); + deleteTags.addAll(getExpireTagsByNumRetainedMax()); + deleteTags.addAll(getExpireTagsByTimeRetained()); + deleteTags.forEach( + tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, callbacks)); + } - if (numRetainedMax != null) { - // only handle auto-created tags here - SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); - if (tags.size() > numRetainedMax) { - int toDelete = tags.size() - numRetainedMax; - int i = 0; - for (List tag : tags.values()) { - tagManager.deleteTag( - checkAndGetOneAutoTag(tag), - tagDeletion, - snapshotManager, - callbacks); - i++; - if (i == toDelete) { - break; - } + private Set getExpireTagsByNumRetainedMax() { + Set deleteTags = new HashSet<>(); + if (numRetainedMax != null) { + // only handle auto-created tags here + SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); + if (tags.size() > numRetainedMax) { + int toDelete = tags.size() - numRetainedMax; + int i = 0; + for (List tag : tags.values()) { + String tagName = checkAndGetOneAutoTag(tag); + LOG.info( + "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", + tagName, + numRetainedMax); + deleteTags.add(tagName); + i++; + if (i == toDelete) { + break; } } } } + return deleteTags; + } + + private Set getExpireTagsByTimeRetained() { + // handle auto-created and non-auto-created-tags here + Set deleteTags = new HashSet<>(); + SortedMap> tags = tagManager.tags(); + for (Map.Entry> entry : tags.entrySet()) { + checkArgument( + entry.getKey() instanceof Tag, + "There is a bug, snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) entry.getKey(); + LocalDateTime createTime = tag.createTime(); + Duration timeRetained = tag.timeRetained(); + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + for (String tagName : entry.getValue()) { + LOG.info( + "Delete tag {}, because its existence time has reached its timeRetained of {}.", + tagName, + timeRetained); + deleteTags.add(tagName); + } + } + } + return deleteTags; } private boolean isAfterOrEqual(LocalDateTime t1, LocalDateTime t2) { @@ -201,6 +251,7 @@ public static TagAutoCreation create( TagPeriodHandler.create(options), options.tagCreationDelay(), options.tagNumRetainedMax(), + options.tagTimeRetained(), options.snapshotWatermarkIdleTimeout(), callbacks); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java index c0fbe718c8fa9..4f6d314b0fbcc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java @@ -90,6 +90,8 @@ public interface TagPeriodHandler { boolean isAutoTag(String tagName); + boolean isNonAutoTag(String tagName); + /** Base implementation of {@link TagPeriodHandler}. */ abstract class BaseTagPeriodHandler implements TagPeriodHandler { @@ -136,6 +138,11 @@ public boolean isAutoTag(String tagName) { return false; } } + + @Override + public boolean isNonAutoTag(String tagName) { + return !isAutoTag(tagName); + } } /** Hourly {@link TagPeriodHandler}. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 676276a30e58b..aa493dd4a90b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -36,6 +36,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import javax.annotation.Nullable; @@ -57,6 +58,7 @@ public class JsonSerdeUtil { static { OBJECT_MAPPER_INSTANCE = new ObjectMapper(); OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule()); + OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule()); } public static LinkedHashMap parseJsonMap(String jsonString, Class valueType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 134dea459f02a..c23ad9a6bdb6f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,11 +26,14 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -75,7 +78,8 @@ public Path branchTagPath(String branchName, String tagName) { } /** Create a tag from given snapshot and save it in the storage. */ - public void createTag(Snapshot snapshot, String tagName, List callbacks) { + public void createTag( + Snapshot snapshot, String tagName, Duration timeRetained, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); // skip create tag for the same snapshot of the same name. @@ -86,7 +90,10 @@ public void createTag(Snapshot snapshot, String tagName, List callb } else { Path newTagPath = tagPath(tagName); try { - fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); + fileIO.writeFileUtf8( + newTagPath, + Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now()) + .toJson()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -227,7 +234,7 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Snapshot.fromPath(fileIO, tagPath(tagName)); + return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot(); } public long tagCount() { @@ -276,10 +283,10 @@ public SortedMap> tags(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Snapshot.safelyFromPath(fileIO, path) + Tag.safelyFromTagPath(fileIO, path) .ifPresent( - snapshot -> - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()) + tag -> + tags.computeIfAbsent(tag, s -> new ArrayList<>()) .add(tagName)); } } catch (IOException e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 819b70d8a5702..781072f85f812 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -61,6 +61,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists; import static org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists; import static org.apache.paimon.operation.FileStoreTestUtils.commitData; @@ -736,6 +737,6 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { } private void createTag(Snapshot snapshot, String tagName) { - tagManager.createTag(snapshot, tagName, Collections.emptyList()); + tagManager.createTag(snapshot, tagName, DEFAULT_TAG_TIME_RETAINED, Collections.emptyList()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java new file mode 100644 index 0000000000000..f99508fea74ab --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/UncleanedFileStoreExpireTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.ExpireSnapshots; +import org.apache.paimon.utils.TagManager; + +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link ExpireSnapshots}. Some files not in use may still remain after the test due to + * the testing methods. + */ +public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase { + + @Test + public void testExpireWithMissingFiles() throws Exception { + ExpireSnapshots expire = store.newExpire(1, 1, 1); + + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(5, allData, snapshotPositions); + + int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); + Set filesInUse = store.getFilesInUse(latestSnapshotId); + List unusedFileList = + Files.walk(Paths.get(tempDir.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList()); + + // shuffle list + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = unusedFileList.size() - 1; i > 0; i--) { + int j = random.nextInt(i + 1); + Collections.swap(unusedFileList, i, j); + } + + // delete some unused files + int numFilesToDelete = random.nextInt(unusedFileList.size()); + for (int i = 0; i < numFilesToDelete; i++) { + fileIO.deleteQuietly(unusedFileList.get(i)); + } + + expire.expire(); + + for (int i = 1; i < latestSnapshotId; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); + } + assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue(); + assertSnapshot(latestSnapshotId, allData, snapshotPositions); + } + + @Test + public void testMixedSnapshotAndTagDeletion() throws Exception { + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + + commit(random.nextInt(10) + 30, allData, snapshotPositions); + int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); + TagManager tagManager = store.newTagManager(); + + // create tags for each snapshot + for (int id = 1; id <= latestSnapshotId; id++) { + Snapshot snapshot = snapshotManager.snapshot(id); + tagManager.createTag( + snapshot, "tag" + id, DEFAULT_TAG_TIME_RETAINED, Collections.emptyList()); + } + + // randomly expire snapshots + int expired = random.nextInt(latestSnapshotId / 2) + 1; + int retained = latestSnapshotId - expired; + store.newExpire(retained, retained, Long.MAX_VALUE).expire(); + + // randomly delete tags + for (int id = 1; id <= latestSnapshotId; id++) { + if (random.nextBoolean()) { + tagManager.deleteTag( + "tag" + id, + store.newTagDeletion(), + snapshotManager, + Collections.emptyList()); + } + } + + // check snapshots and tags + Set allSnapshots = new HashSet<>(); + snapshotManager.snapshots().forEachRemaining(allSnapshots::add); + allSnapshots.addAll(tagManager.taggedSnapshots()); + + for (Snapshot snapshot : allSnapshots) { + assertSnapshot(snapshot, allData, snapshotPositions); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 1f2b88047a8ee..b2e564efd76b0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.TagManager; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.function.Function; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link TagsTable}. */ @@ -120,19 +122,24 @@ void testTagBranchesTable() throws Exception { private List getExceptedResult( Function> tagBranchesFunction) { List internalRows = new ArrayList<>(); - for (Map.Entry> tag : tagManager.tags().entrySet()) { - Snapshot snapshot = tag.getKey(); - for (String tagName : tag.getValue()) { + for (Map.Entry> snapshot : tagManager.tags().entrySet()) { + checkArgument( + snapshot.getKey() instanceof Tag, + "There is a bug, Snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) snapshot.getKey(); + for (String tagName : snapshot.getValue()) { internalRows.add( GenericRow.of( BinaryString.fromString(tagName), - snapshot.id(), - snapshot.schemaId(), + tag.id(), + tag.schemaId(), Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), + DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), BinaryString.fromString( - tagBranchesFunction.apply(tagName).toString()))); + tagBranchesFunction.apply(tagName).toString()), + tag.createTime(), + tag.timeRetained())); } } return internalRows; diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java index f76a58e6cfd18..71224adede189 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java @@ -43,6 +43,7 @@ import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD; import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER; +import static org.apache.paimon.CoreOptions.TAG_TIME_RETAINED; import static org.assertj.core.api.Assertions.assertThat; /** Test for tag automatic creation. */ @@ -320,6 +321,31 @@ public void testWatermarkIdleTimeoutForceCreatingSnapshot() throws Exception { commit.close(); } + @Test + public void testExpireTagsByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(4, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 18"); + + commit.close(); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault()) diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java new file mode 100644 index 0000000000000..b97b008ff33eb --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDateTime; + +/** Test for {@link Tag}. */ +public class TagTest { + + private final Snapshot snapshot = + new Snapshot( + 0, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + + @Test + public void testFromJson() { + Tag tag = Tag.fromJson(snapshot.toJson()); + Assert.assertEquals( + "{\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"logOffsets\" : null,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"changelogRecordCount\" : null,\n" + + " \"watermark\" : null,\n" + + " \"createTime\" : null,\n" + + " \"timeRetained\" : null,\n" + + " \"version\" : 3\n" + + "}", + tag.toJson()); + } + + @Test + public void testFromSnapshotAndTagTtl() { + Tag tag = + Tag.fromSnapshotAndTagTtl( + snapshot, + Duration.ofSeconds(5), + LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)); + Assert.assertEquals( + "{\n" + + " \"createTime\" : [ 1969, 1, 1, 0, 0, 0, 123456789 ],\n" + + " \"timeRetained\" : 5.000000000,\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"logOffsets\" : null,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"changelogRecordCount\" : null,\n" + + " \"watermark\" : null\n" + + "}", + tag.toJson()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 0e500c3e74883..5928783c34a3c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -18,12 +18,14 @@ package org.apache.paimon.flink.action; +import java.time.Duration; import java.util.Map; /** Create tag action for Flink. */ public class CreateTagAction extends TableActionBase { private final String tagName; + private final Duration timeRetained; private final Long snapshotId; public CreateTagAction( @@ -32,18 +34,20 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, + Duration timeRetained, Long snapshotId) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; + this.timeRetained = timeRetained; this.snapshotId = snapshotId; } @Override public void run() throws Exception { if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, timeRetained, snapshotId); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index 01bbdba42abff..2df28dbdde3b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -18,11 +18,16 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.utils.TimeUtils; + import org.apache.flink.api.java.tuple.Tuple3; +import java.time.Duration; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; + /** Factory to create {@link CreateTagAction}. */ public class CreateTagActionFactory implements ActionFactory { @@ -30,6 +35,7 @@ public class CreateTagActionFactory implements ActionFactory { private static final String TAG_NAME = "tag_name"; private static final String SNAPSHOT = "snapshot"; + private static final String TIME_RETAINED = "time_retained"; @Override public String identifier() { @@ -49,9 +55,20 @@ public Optional create(MultipleParameterToolAdapter params) { snapshot = Long.parseLong(params.get(SNAPSHOT)); } + Duration timeRetained = DEFAULT_TAG_TIME_RETAINED; + if (params.has(TIME_RETAINED)) { + timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED)); + } + CreateTagAction action = new CreateTagAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, tagName, snapshot); + tablePath.f0, + tablePath.f1, + tablePath.f2, + catalogConfig, + tagName, + timeRetained, + snapshot); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9999fdf117761..a071573d26333 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -26,11 +26,13 @@ import javax.annotation.Nullable; +import java.time.Duration; + /** * Create tag procedure. Usage: * *

- *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', timeRetained, snapshotId)
  * 
*/ public class CreateTagProcedure extends ProcedureBase { @@ -38,23 +40,32 @@ public class CreateTagProcedure extends ProcedureBase { public static final String IDENTIFIER = "create_tag"; public String[] call( - ProcedureContext procedureContext, String tableId, String tagName, long snapshotId) + ProcedureContext procedureContext, + String tableId, + String tagName, + Duration timeRetained, + long snapshotId) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, snapshotId); + return innerCall(tableId, tagName, timeRetained, snapshotId); } - public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + public String[] call( + ProcedureContext procedureContext, + String tableId, + String tagName, + Duration timeRetained) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, null); + return innerCall(tableId, tagName, timeRetained, null); } - private String[] innerCall(String tableId, String tagName, @Nullable Long snapshotId) + private String[] innerCall( + String tableId, String tagName, Duration timeRetained, @Nullable Long snapshotId) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, timeRetained, snapshotId); } return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index dcf2c8b0045cd..6d27c60194837 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; @@ -76,6 +77,8 @@ public class AutoTagForSavepointCommitterOperator private final NavigableSet identifiersForTags; + private final Duration tagTimeRetained; + private transient SnapshotManager snapshotManager; private transient TagManager tagManager; @@ -91,13 +94,15 @@ public AutoTagForSavepointCommitterOperator( SerializableSupplier snapshotManagerFactory, SerializableSupplier tagManagerFactory, SerializableSupplier tagDeletionFactory, - SerializableSupplier> callbacksSupplier) { + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; } @Override @@ -167,7 +172,7 @@ private void createTagForIdentifiers(List identifiers) { for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 2c898831ec2ce..93eac0431be64 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -118,7 +118,11 @@ private void createTag() { tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } // Create a new tag - tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); + tagManager.createTag( + snapshot, + tagName, + table.coreOptions().tagTimeRetained(), + table.store().createTagCallbacks()); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 97c426ee56854..3a4ddcb5dac1f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -242,7 +242,8 @@ protected DataStreamSink doCommit(DataStream written, String com table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.coreOptions().tagTimeRetained()); } if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 0dbde05795fd3..4090444583207 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -208,7 +209,8 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + DEFAULT_TAG_TIME_RETAINED); } @Override @@ -224,6 +226,7 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + DEFAULT_TAG_TIME_RETAINED); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index 7f9bdb62108b6..c1db61a7a00bd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.TimeUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,9 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.Duration; + +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,6 +41,7 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), + ProcedureParameter.optional("time_retained", StringType), ProcedureParameter.optional("snapshot", LongType) }; @@ -63,15 +69,19 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String tag = args.getString(1); - Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + Duration timeRetained = + args.isNullAt(2) + ? DEFAULT_TAG_TIME_RETAINED + : TimeUtils.parseDuration(args.getString(2)); + Long snapshot = args.isNullAt(3) ? null : args.getLong(3); return modifyPaimonTable( tableIdent, table -> { if (snapshot == null) { - table.createTag(tag); + table.createTag(tag, timeRetained); } else { - table.createTag(tag, snapshot); + table.createTag(tag, timeRetained, snapshot); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow};