diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index fc499f607173..75a4496659ff 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -109,6 +109,7 @@ You can create a tag with given name and snapshot ID. --table \ --tag_name \ [--snapshot ] \ + [--time_retained ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -126,6 +127,7 @@ public class CreateTag { public static void main(String[] args) { Table table = ...; table.createTag("my-tag", 1); + table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12)); } } ``` @@ -138,6 +140,11 @@ Run the following sql: CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2); ``` +To create a tag with retained 1 day, run the following sql: +```sql +CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d'); +``` + To create a tag based on the latest snapshot id, run the following sql: ```sql CALL create_tag(table => 'test.t', tag => 'test_tag'); diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 4c42bbfbbb58..cab6878bb3a3 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -678,6 +678,12 @@

Enum

What frequency is used to generate tags.

Possible values:
  • "daily": Generate a tag every day.
  • "hourly": Generate a tag every hour.
  • "two-hours": Generate a tag every two hours.
+ +
tag.default-time-retained
+ (none) + Duration + The default maximum time retained for newly created tags. +
tag.num-retained-max
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 37bd0c217f57..a302a3d76df1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1014,6 +1014,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The maximum number of tags to retain."); + public static final ConfigOption TAG_DEFAULT_TIME_RETAINED = + key("tag.default-time-retained") + .durationType() + .noDefaultValue() + .withDescription("The default maximum time retained for newly created tags."); + public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") .durationType() @@ -1643,10 +1649,15 @@ public TagPeriodFormatter tagPeriodFormatter() { return options.get(TAG_PERIOD_FORMATTER); } + @Nullable public Integer tagNumRetainedMax() { return options.get(TAG_NUM_RETAINED_MAX); } + public Duration tagDefaultTimeRetained() { + return options.get(TAG_DEFAULT_TIME_RETAINED); + } + public Duration snapshotWatermarkIdleTimeout() { return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 87cc4e65c544..07a73d3de2aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -41,7 +41,7 @@ import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.sink.CallbackUtils; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; @@ -248,9 +248,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { } @Override - @Nullable - public TagAutoCreation newTagCreationManager() { - return TagAutoCreation.create( + public TagAutoManager newTagCreationManager() { + return TagAutoManager.create( options, snapshotManager(), newTagManager(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 6731121c567b..870feffdef68 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -33,7 +33,7 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; @@ -92,8 +92,7 @@ public interface FileStore extends Serializable { @Nullable PartitionExpire newPartitionExpire(String commitUser); - @Nullable - TagAutoCreation newTagCreationManager(); + TagAutoManager newTagCreationManager(); ServiceManager newServiceManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 0ac23ecf05dc..fefc41058774 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** * This file is the entrance to all data committed at some specific time point. @@ -95,37 +94,37 @@ public class Snapshot { // null for paimon <= 0.2 @JsonProperty(FIELD_VERSION) @Nullable - private final Integer version; + protected final Integer version; @JsonProperty(FIELD_ID) - private final long id; + protected final long id; @JsonProperty(FIELD_SCHEMA_ID) - private final long schemaId; + protected final long schemaId; // a manifest list recording all changes from the previous snapshots @JsonProperty(FIELD_BASE_MANIFEST_LIST) - private final String baseManifestList; + protected final String baseManifestList; // a manifest list recording all new changes occurred in this snapshot // for faster expire and streaming reads @JsonProperty(FIELD_DELTA_MANIFEST_LIST) - private final String deltaManifestList; + protected final String deltaManifestList; // a manifest list recording all changelog produced in this snapshot // null if no changelog is produced, or for paimon <= 0.2 @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable - private final String changelogManifestList; + protected final String changelogManifestList; // a manifest recording all index files of this table // null if no index file @JsonProperty(FIELD_INDEX_MANIFEST) @JsonInclude(JsonInclude.Include.NON_NULL) - private final String indexManifest; + protected final String indexManifest; @JsonProperty(FIELD_COMMIT_USER) - private final String commitUser; + protected final String commitUser; // Mainly for snapshot deduplication. // @@ -135,37 +134,37 @@ public class Snapshot { // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be // committed before snapshot B, and thus snapshot A must contain older records than snapshot B. @JsonProperty(FIELD_COMMIT_IDENTIFIER) - private final long commitIdentifier; + protected final long commitIdentifier; @JsonProperty(FIELD_COMMIT_KIND) - private final CommitKind commitKind; + protected final CommitKind commitKind; @JsonProperty(FIELD_TIME_MILLIS) - private final long timeMillis; + protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Map logOffsets; + protected final Map logOffsets; // record count of all changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable - private final Long totalRecordCount; + protected final Long totalRecordCount; // record count of all new changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable - private final Long deltaRecordCount; + protected final Long deltaRecordCount; // record count of all changelog produced in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Long changelogRecordCount; + protected final Long changelogRecordCount; // watermark for input records // null for paimon <= 0.3 @@ -174,14 +173,14 @@ public class Snapshot { @JsonProperty(FIELD_WATERMARK) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Long watermark; + protected final Long watermark; // stats file name for statistics of this table // null if no stats file @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_STATISTICS) @Nullable - private final String statistics; + protected final String statistics; public Snapshot( long id, @@ -447,12 +446,13 @@ public static Snapshot fromPath(FileIO fileIO, Path path) { } } - public static Optional safelyFromPath(FileIO fileIO, Path path) throws IOException { + @Nullable + public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException { try { String json = fileIO.readFileUtf8(path); - return Optional.of(Snapshot.fromJson(json)); + return Snapshot.fromJson(json); } catch (FileNotFoundException e) { - return Optional.empty(); + return null; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 83508adf083f..e389a471c4ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1231,7 +1231,7 @@ static ConflictCheck noConflictCheck() { return latestSnapshot -> false; } - static ConflictCheck mustConflictCheck() { + public static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 70e33ddb32f4..1ca321d563a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -461,8 +462,7 @@ public void rollbackTo(long snapshotId) { rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId)); } - @Override - public void createTag(String tagName, long fromSnapshotId) { + public Snapshot createTagInternal(long fromSnapshotId) { SnapshotManager snapshotManager = snapshotManager(); Snapshot snapshot = null; if (snapshotManager.snapshotExists(fromSnapshotId)) { @@ -482,18 +482,36 @@ public void createTag(String tagName, long fromSnapshotId) { snapshot != null, "Cannot create tag because given snapshot #%s doesn't exist.", fromSnapshotId); - createTag(tagName, snapshot); + return snapshot; + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + createTag( + tagName, createTagInternal(fromSnapshotId), coreOptions().tagDefaultTimeRetained()); + } + + @Override + public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + createTag(tagName, createTagInternal(fromSnapshotId), timeRetained); } @Override public void createTag(String tagName) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, latestSnapshot); + createTag(tagName, latestSnapshot, coreOptions().tagDefaultTimeRetained()); + } + + @Override + public void createTag(String tagName, Duration timeRetained) { + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); + createTag(tagName, latestSnapshot, timeRetained); } - private void createTag(String tagName, Snapshot fromSnapshot) { - tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks()); + private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) { + tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index d07035f2651e..be4976f1003e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -25,6 +25,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.InnerStreamTableScan; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -109,6 +110,14 @@ default void createTag(String tagName, long fromSnapshotId) { this.getClass().getSimpleName())); } + @Override + default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void createTag(String tagName) { throw new UnsupportedOperationException( @@ -117,6 +126,14 @@ default void createTag(String tagName) { this.getClass().getSimpleName())); } + @Override + default void createTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3ed6a1990e5f..3650b773cb5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.RowType; import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,10 +77,16 @@ public interface Table extends Serializable { @Experimental void createTag(String tagName, long fromSnapshotId); + @Experimental + void createTag(String tagName, long fromSnapshotId, Duration timeRetained); + /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); + @Experimental + void createTag(String tagName, Duration timeRetained); + /** Delete a tag by name. */ @Experimental void deleteTag(String tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index c63511ac4652..eab7a0c6d036 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -31,7 +31,7 @@ import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.IOUtils; @@ -76,7 +76,7 @@ public class TableCommitImpl implements InnerTableCommit { private final List commitCallbacks; @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; - @Nullable private final TagAutoCreation tagAutoCreation; + @Nullable private final TagAutoManager tagAutoManager; private final Lock lock; @Nullable private final Duration consumerExpireTime; @@ -96,7 +96,7 @@ public TableCommitImpl( List commitCallbacks, @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, - @Nullable TagAutoCreation tagAutoCreation, + @Nullable TagAutoManager tagAutoManager, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, @@ -112,7 +112,7 @@ public TableCommitImpl( this.commitCallbacks = commitCallbacks; this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; - this.tagAutoCreation = tagAutoCreation; + this.tagAutoManager = tagAutoManager; this.lock = lock; this.consumerExpireTime = consumerExpireTime; @@ -131,8 +131,12 @@ public TableCommitImpl( } public boolean forceCreatingSnapshot() { - return this.forceCreatingSnapshot - || (tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot()); + if (this.forceCreatingSnapshot) { + return true; + } + return tagAutoManager != null + && tagAutoManager.getTagAutoCreation() != null + && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot(); } @Override @@ -349,8 +353,8 @@ private void expire(long partitionExpireIdentifier) { partitionExpire.expire(partitionExpireIdentifier); } - if (tagAutoCreation != null) { - tagAutoCreation.run(); + if (tagAutoManager != null) { + tagAutoManager.run(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index ebd8aa7af0db..21c9cf495fb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -19,7 +19,6 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -39,18 +38,21 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.utils.TagManager; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -60,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.SortedMap; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -79,7 +80,10 @@ public class TagsTable implements ReadonlyTable { new DataField(2, "schema_id", new BigIntType(false)), new DataField(3, "commit_time", new TimestampType(false, 3)), new DataField(4, "record_count", new BigIntType(true)), - new DataField(5, "branches", SerializationUtils.newStringType(true)))); + new DataField(5, "branches", SerializationUtils.newStringType(true)), + new DataField(6, "create_time", new TimestampType(false, 3)), + new DataField( + 7, "time_retained", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -204,12 +208,10 @@ public RecordReader createReader(Split split) { Options options = new Options(); options.set(CoreOptions.PATH, location.toUri().toString()); FileStoreTable table = FileStoreTableFactory.create(fileIO, options); - SortedMap> tags = table.tagManager().tags(); - Map nameToSnapshot = new LinkedHashMap<>(); - for (Map.Entry> tag : tags.entrySet()) { - for (String tagName : tag.getValue()) { - nameToSnapshot.put(tagName, tag.getKey()); - } + List> tags = table.tagManager().tagObjects(); + Map nameToSnapshot = new LinkedHashMap<>(); + for (Pair tag : tags) { + nameToSnapshot.put(tag.getValue(), tag.getKey()); } Map> tagBranches = new HashMap<>(); table.branchManager() @@ -234,17 +236,24 @@ public RecordReader createReader(Split split) { } private InternalRow toRow( - Map.Entry tag, Map> tagBranches) { - Snapshot snapshot = tag.getValue(); - List branches = tagBranches.get(tag.getKey()); + Map.Entry snapshot, Map> tagBranches) { + Tag tag = snapshot.getValue(); + List branches = tagBranches.get(snapshot.getKey()); return GenericRow.of( - BinaryString.fromString(tag.getKey()), - snapshot.id(), - snapshot.schemaId(), + BinaryString.fromString(snapshot.getKey()), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(branches == null ? "[]" : branches.toString()), Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), - BinaryString.fromString(branches == null ? "[]" : branches.toString())); + tag.getTagCreateTime() == null + ? LocalDateTime.MIN + : tag.getTagCreateTime()), + BinaryString.fromString( + tag.getTagTimeRetained() == null + ? "" + : tag.getTagTimeRetained().toString())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java new file mode 100644 index 000000000000..938bb417defa --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Objects; + +/** Snapshot with tagCreateTime and tagTimeRetained. */ +public class Tag extends Snapshot { + + private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime"; + private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained"; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TAG_CREATE_TIME) + @Nullable + private final LocalDateTime tagCreateTime; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TAG_TIME_RETAINED) + @Nullable + private final Duration tagTimeRetained; + + @JsonCreator + public Tag( + @JsonProperty(FIELD_VERSION) @Nullable Integer version, + @JsonProperty(FIELD_ID) long id, + @JsonProperty(FIELD_SCHEMA_ID) long schemaId, + @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, + @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, + @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList, + @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest, + @JsonProperty(FIELD_COMMIT_USER) String commitUser, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, + @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets, + @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount, + @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, + @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, + @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, + @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { + super( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + this.tagCreateTime = tagCreateTime; + this.tagTimeRetained = tagTimeRetained; + } + + @JsonGetter(FIELD_TAG_CREATE_TIME) + public @Nullable LocalDateTime getTagCreateTime() { + return tagCreateTime; + } + + @JsonGetter(FIELD_TAG_TIME_RETAINED) + public @Nullable Duration getTagTimeRetained() { + return tagTimeRetained; + } + + @Override + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return Tag.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read tag from path " + path, e); + } + } + + @Nullable + public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException { + try { + String json = fileIO.readFileUtf8(path); + return Tag.fromJson(json); + } catch (FileNotFoundException e) { + return null; + } + } + + public static Tag fromSnapshotAndTagTtl( + Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { + return new Tag( + snapshot.version(), + snapshot.id(), + snapshot.schemaId(), + snapshot.baseManifestList(), + snapshot.deltaManifestList(), + snapshot.changelogManifestList(), + snapshot.indexManifest(), + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.totalRecordCount(), + snapshot.deltaRecordCount(), + snapshot.changelogRecordCount(), + snapshot.watermark(), + snapshot.statistics(), + tagCreateTime, + tagTimeRetained); + } + + public Snapshot trimToSnapshot() { + return new Snapshot( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), tagCreateTime, tagTimeRetained); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + Tag that = (Tag) o; + return Objects.equals(tagCreateTime, that.tagCreateTime) + && Objects.equals(tagTimeRetained, that.tagTimeRetained); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 505454313bd2..409ceb3dc63d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -27,6 +27,9 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.time.Duration; @@ -44,13 +47,16 @@ /** A manager to create tags automatically. */ public class TagAutoCreation { + private static final Logger LOG = LoggerFactory.getLogger(TagAutoCreation.class); + private final SnapshotManager snapshotManager; private final TagManager tagManager; private final TagDeletion tagDeletion; private final TagTimeExtractor timeExtractor; private final TagPeriodHandler periodHandler; private final Duration delay; - private final Integer numRetainedMax; + @Nullable private final Integer numRetainedMax; + @Nullable private final Duration defaultTimeRetained; private final List callbacks; private final Duration idlenessTimeout; @@ -64,7 +70,8 @@ private TagAutoCreation( TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, Duration delay, - Integer numRetainedMax, + @Nullable Integer numRetainedMax, + @Nullable Duration defaultTimeRetained, Duration idlenessTimeout, List callbacks) { this.snapshotManager = snapshotManager; @@ -74,6 +81,7 @@ private TagAutoCreation( this.periodHandler = periodHandler; this.delay = delay; this.numRetainedMax = numRetainedMax; + this.defaultTimeRetained = defaultTimeRetained; this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; @@ -118,7 +126,7 @@ public boolean forceCreatingSnapshot() { public void run() { while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { - tryToTag(snapshotManager.snapshot(nextSnapshot)); + tryToCreateTags(snapshotManager.snapshot(nextSnapshot)); nextSnapshot++; } else { // avoid snapshot has been expired @@ -132,7 +140,7 @@ public void run() { } } - private void tryToTag(Snapshot snapshot) { + private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); if (!timeOptional.isPresent()) { @@ -144,7 +152,7 @@ private void tryToTag(Snapshot snapshot) { || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); nextTag = periodHandler.nextTagTime(thisTag); if (numRetainedMax != null) { @@ -154,6 +162,10 @@ private void tryToTag(Snapshot snapshot) { int toDelete = tags.size() - numRetainedMax; int i = 0; for (List tag : tags.values()) { + LOG.info( + "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", + tagName, + numRetainedMax); tagManager.deleteTag( checkAndGetOneAutoTag(tag), tagDeletion, @@ -201,6 +213,7 @@ public static TagAutoCreation create( TagPeriodHandler.create(options), options.tagCreationDelay(), options.tagNumRetainedMax(), + options.tagDefaultTimeRetained(), options.snapshotWatermarkIdleTimeout(), callbacks); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java new file mode 100644 index 000000000000..387a3e746adc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.util.List; + +/** A manager to create and expire tags. */ +public class TagAutoManager { + + private final TagAutoCreation tagAutoCreation; + private final TagTimeExpire tagTimeExpire; + + private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire tagTimeExpire) { + this.tagAutoCreation = tagAutoCreation; + this.tagTimeExpire = tagTimeExpire; + } + + public void run() { + if (tagAutoCreation != null) { + tagAutoCreation.run(); + } + if (tagTimeExpire != null) { + tagTimeExpire.run(); + } + } + + public static TagAutoManager create( + CoreOptions options, + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); + + return new TagAutoManager( + extractor == null + ? null + : TagAutoCreation.create( + options, snapshotManager, tagManager, tagDeletion, callbacks), + TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, callbacks)); + } + + public TagAutoCreation getTagAutoCreation() { + return tagAutoCreation; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java new file mode 100644 index 000000000000..d4797c0cb056 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; + +/** A manager to expire tags by time. */ +public class TagTimeExpire { + + private static final Logger LOG = LoggerFactory.getLogger(TagTimeExpire.class); + + private final SnapshotManager snapshotManager; + private final TagManager tagManager; + private final TagDeletion tagDeletion; + private final List callbacks; + + private TagTimeExpire( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.tagDeletion = tagDeletion; + this.callbacks = callbacks; + } + + public void run() { + List> tags = tagManager.tagObjects(); + for (Pair pair : tags) { + Tag tag = pair.getLeft(); + String tagName = pair.getRight(); + LocalDateTime createTime = tag.getTagCreateTime(); + Duration timeRetained = tag.getTagTimeRetained(); + if (createTime == null || timeRetained == null) { + continue; + } + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + LOG.info( + "Delete tag {}, because its existence time has reached its timeRetained of {}.", + tagName, + timeRetained); + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); + } + } + } + + public static TagTimeExpire create( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + return new TagTimeExpire(snapshotManager, tagManager, tagDeletion, callbacks); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 676276a30e58..aa493dd4a90b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -36,6 +36,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import javax.annotation.Nullable; @@ -57,6 +58,7 @@ public class JsonSerdeUtil { static { OBJECT_MAPPER_INSTANCE = new ObjectMapper(); OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule()); + OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule()); } public static LinkedHashMap parseJsonMap(String jsonString, Class valueType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 174b4233cccc..dbbc8fffdc05 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -394,7 +394,10 @@ public List safelyGetAllSnapshots() throws IOException { List snapshots = new ArrayList<>(); for (Path path : paths) { - Snapshot.safelyFromPath(fileIO, path).ifPresent(snapshots::add); + Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); + if (snapshot != null) { + snapshots.add(snapshot); + } } return snapshots; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 134dea459f02..8b7818fed782 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,11 +26,16 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -75,7 +80,11 @@ public Path branchTagPath(String branchName, String tagName) { } /** Create a tag from given snapshot and save it in the storage. */ - public void createTag(Snapshot snapshot, String tagName, List callbacks) { + public void createTag( + Snapshot snapshot, + String tagName, + @Nullable Duration timeRetained, + List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); // skip create tag for the same snapshot of the same name. @@ -86,7 +95,13 @@ public void createTag(Snapshot snapshot, String tagName, List callb } else { Path newTagPath = tagPath(tagName); try { - fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); + fileIO.writeFileUtf8( + newTagPath, + timeRetained != null + ? Tag.fromSnapshotAndTagTtl( + snapshot, timeRetained, LocalDateTime.now()) + .toJson() + : snapshot.toJson()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -227,7 +242,8 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Snapshot.fromPath(fileIO, tagPath(tagName)); + // Trim to snapshot to avoid equals and compare snapshot. + return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); } public long tagCount() { @@ -243,7 +259,7 @@ public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } - /** Get all tagged snapshots with names sorted by snapshot id. */ + /** Get all tagged snapshots with tag names sorted by snapshot id. */ public SortedMap> tags() { return tags(tagName -> true); } @@ -276,11 +292,10 @@ public SortedMap> tags(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Snapshot.safelyFromPath(fileIO, path) - .ifPresent( - snapshot -> - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()) - .add(tagName)); + Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); + if (snapshot != null) { + tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } } } catch (IOException e) { throw new RuntimeException(e); @@ -288,6 +303,27 @@ public SortedMap> tags(Predicate filter) { return tags; } + /** Get all {@link Tag}s. */ + public List> tagObjects() { + try { + List paths = + listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + .map(FileStatus::getPath) + .collect(Collectors.toList()); + List> tags = new ArrayList<>(); + for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); + Tag tag = Tag.safelyFromPath(fileIO, path); + if (tag != null) { + tags.add(Pair.of(tag, tagName)); + } + } + return tags; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public List sortTagsOfOneSnapshot(List tagNames) { return tagNames.stream() .map( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index de95819ad889..626c13c018fc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -140,7 +140,11 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { // create tags for each snapshot for (int id = 1; id <= latestSnapshotId; id++) { Snapshot snapshot = snapshotManager.snapshot(id); - tagManager.createTag(snapshot, "tag" + id, Collections.emptyList()); + tagManager.createTag( + snapshot, + "tag" + id, + store.options().tagDefaultTimeRetained(), + Collections.emptyList()); } // randomly expire snapshots diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 819b70d8a570..a9be8d3ac988 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -51,6 +51,7 @@ import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -262,7 +263,7 @@ public void testExpireWithExistingTags() throws Exception { // step 2: commit -A (by clean bucket 0) and create tag1 cleanBucket(store, gen.getPartition(gen.next()), 0); - createTag(snapshotManager.snapshot(2), "tag1"); + createTag(snapshotManager.snapshot(2), "tag1", store.options().tagDefaultTimeRetained()); assertThat(tagManager.tagExists("tag1")).isTrue(); // step 3: commit C to bucket 2 @@ -273,7 +274,7 @@ public void testExpireWithExistingTags() throws Exception { // step 4: commit -B (by clean bucket 1) and create tag2 cleanBucket(store, partition, 1); - createTag(snapshotManager.snapshot(4), "tag2"); + createTag(snapshotManager.snapshot(4), "tag2", store.options().tagDefaultTimeRetained()); assertThat(tagManager.tagExists("tag2")).isTrue(); // step 5: commit D to bucket 3 @@ -353,7 +354,7 @@ public void testExpireWithUpgradeAndTags() throws Exception { // snapshot 3: commit -A (by clean bucket 0) cleanBucket(store, gen.getPartition(gen.next()), 0); - createTag(snapshotManager.snapshot(1), "tag1"); + createTag(snapshotManager.snapshot(1), "tag1", store.options().tagDefaultTimeRetained()); store.newExpire(1, 1, Long.MAX_VALUE).expire(); // check data file and manifests @@ -410,7 +411,7 @@ public void testDeleteTagWithSnapshot() throws Exception { Arrays.asList(snapshot1.baseManifestList(), snapshot1.deltaManifestList()); // create tag1 - createTag(snapshot1, "tag1"); + createTag(snapshot1, "tag1", store.options().tagDefaultTimeRetained()); // expire snapshot 1, 2 store.newExpire(1, 1, Long.MAX_VALUE).expire(); @@ -485,9 +486,9 @@ public void testDeleteTagWithOtherTag() throws Exception { Arrays.asList(snapshot2.baseManifestList(), snapshot2.deltaManifestList()); // create tags - createTag(snapshotManager.snapshot(1), "tag1"); - createTag(snapshotManager.snapshot(2), "tag2"); - createTag(snapshotManager.snapshot(4), "tag3"); + createTag(snapshotManager.snapshot(1), "tag1", store.options().tagDefaultTimeRetained()); + createTag(snapshotManager.snapshot(2), "tag2", store.options().tagDefaultTimeRetained()); + createTag(snapshotManager.snapshot(4), "tag3", store.options().tagDefaultTimeRetained()); // expire snapshot 1, 2, 3, 4 store.newExpire(1, 1, Long.MAX_VALUE).expire(); @@ -735,7 +736,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { null); } - private void createTag(Snapshot snapshot, String tagName) { - tagManager.createTag(snapshot, tagName, Collections.emptyList()); + private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) { + tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 51cfcd702f8c..4b381a6011a0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -30,8 +29,10 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.TagManager; import org.junit.jupiter.api.BeforeEach; @@ -42,7 +43,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -120,20 +120,26 @@ void testTagBranchesTable() throws Exception { private List getExceptedResult( Function> tagBranchesFunction) { List internalRows = new ArrayList<>(); - for (Map.Entry> tag : tagManager.tags().entrySet()) { - Snapshot snapshot = tag.getKey(); - for (String tagName : tag.getValue()) { - internalRows.add( - GenericRow.of( - BinaryString.fromString(tagName), - snapshot.id(), - snapshot.schemaId(), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), - BinaryString.fromString( - tagBranchesFunction.apply(tagName).toString()))); - } + for (Pair snapshot : tagManager.tagObjects()) { + Tag tag = snapshot.getKey(); + String tagName = snapshot.getValue(); + internalRows.add( + GenericRow.of( + BinaryString.fromString(tagName), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(tagBranchesFunction.apply(tagName).toString()), + Timestamp.fromLocalDateTime( + tag.getTagCreateTime() == null + ? LocalDateTime.MIN + : tag.getTagCreateTime()), + BinaryString.fromString( + tag.getTagTimeRetained() == null + ? "" + : tag.getTagTimeRetained().toString()))); } return internalRows; } diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java similarity index 78% rename from paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java rename to paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index f76a58e6cfd1..a065945bad01 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.CoreOptions.TagCreationPeriod; import org.apache.paimon.CoreOptions.TagPeriodFormatter; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; @@ -33,6 +34,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Collections; import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; @@ -41,12 +43,13 @@ import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION; import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY; import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD; +import static org.apache.paimon.CoreOptions.TAG_DEFAULT_TIME_RETAINED; import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER; import static org.assertj.core.api.Assertions.assertThat; /** Test for tag automatic creation. */ -public class TagAutoCreationTest extends PrimaryKeyTableTestBase { +public class TagAutoManagerTest extends PrimaryKeyTableTestBase { @Test public void testTag() throws Exception { @@ -320,6 +323,104 @@ public void testWatermarkIdleTimeoutForceCreatingSnapshot() throws Exception { commit.close(); } + @Test + public void testAutoCreateTagNotExpiredByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(4, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 18"); + + commit.close(); + } + + @Test + public void testExpireTagsByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + Snapshot snapshot1 = + new Snapshot( + 4, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + tagManager.createTag( + snapshot1, + "non-auto-create-tag-shoule-expire", + Duration.ofMillis(500), + Collections.emptyList()); + + Snapshot snapshot2 = + new Snapshot( + 5, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + tagManager.createTag( + snapshot2, + "non-auto-create-tag-shoule-not-expire", + Duration.ofDays(1), + Collections.emptyList()); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(6, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()) + .containsOnly("2023-07-18 18", "non-auto-create-tag-shoule-not-expire"); + commit.close(); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault()) diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java new file mode 100644 index 000000000000..3198366dd3e5 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDateTime; + +/** Test for {@link Tag}. */ +public class TagTest { + + private final Snapshot snapshot = + new Snapshot( + 0, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + + @Test + public void testFromJson() { + Tag tag = Tag.fromJson(snapshot.toJson()); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null\n" + + "}", + tag.toJson()); + } + + @Test + public void testFromSnapshotAndTagTtl() { + Tag tag = + Tag.fromSnapshotAndTagTtl( + snapshot, + Duration.ofSeconds(5), + LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)); + String tagJson = tag.toJson(); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"tagCreateTime\" : [ 1969, 1, 1, 0, 0, 0, 123456789 ],\n" + + " \"tagTimeRetained\" : 5.000000000\n" + + "}", + tagJson); + + Tag newTag = Tag.fromJson(tagJson); + Assert.assertEquals(tag, newTag); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java new file mode 100644 index 000000000000..3e702b9b2cd0 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.operation.FileStoreTestUtils; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.paimon.operation.FileStoreTestUtils.commitData; +import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for TagManager. */ +public class TagManagerTest { + + @TempDir java.nio.file.Path tempDir; + + private final FileIO fileIO = new LocalFileIO(); + + private long commitIdentifier; + private String root; + private TagManager tagManager; + + @BeforeEach + public void setup() throws Exception { + commitIdentifier = 0L; + root = tempDir.toString(); + tagManager = null; + } + + @Test + public void testCreateTagWithoutTimeRetained() throws Exception { + TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4); + tagManager = new TagManager(fileIO, store.options().path()); + SnapshotManager snapshotManager = store.snapshotManager(); + TestKeyValueGenerator gen = + new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); + BinaryRow partition = gen.getPartition(gen.next()); + + // commit A to bucket 0 and B to bucket 1 + Map>> writers = new HashMap<>(); + for (int bucket : Arrays.asList(0, 1)) { + List kvs = partitionedData(5, gen); + writeData(store, kvs, partition, bucket, writers); + } + commitData(store, commitIdentifier++, writers); + + tagManager.createTag( + snapshotManager.snapshot(1), + "tag", + store.options().tagDefaultTimeRetained(), + Collections.emptyList()); + assertThat(tagManager.tagExists("tag")).isTrue(); + Snapshot snapshot = tagManager.taggedSnapshot("tag"); + String snapshotJson = snapshot.toJson(); + Assertions.assertTrue( + !snapshotJson.contains("tagCreateTime") + && !snapshotJson.contains("tagTimeRetained")); + Assertions.assertEquals(snapshot, Snapshot.fromJson(snapshotJson)); + } + + @Test + public void testCreateTagWithTimeRetained() throws Exception { + TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4); + tagManager = new TagManager(fileIO, store.options().path()); + SnapshotManager snapshotManager = store.snapshotManager(); + TestKeyValueGenerator gen = + new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); + BinaryRow partition = gen.getPartition(gen.next()); + + // commit A to bucket 0 and B to bucket 1 + Map>> writers = new HashMap<>(); + for (int bucket : Arrays.asList(0, 1)) { + List kvs = partitionedData(5, gen); + writeData(store, kvs, partition, bucket, writers); + } + commitData(store, commitIdentifier++, writers); + + tagManager.createTag( + snapshotManager.snapshot(1), "tag", Duration.ofDays(1), Collections.emptyList()); + assertThat(tagManager.tagExists("tag")).isTrue(); + List> tags = tagManager.tagObjects(); + Assertions.assertEquals(1, tags.size()); + Tag tag = tags.get(0).getKey(); + String tagJson = tag.toJson(); + Assertions.assertTrue( + tagJson.contains("tagCreateTime") && tagJson.contains("tagTimeRetained")); + Assertions.assertEquals(tag, Tag.fromJson(tagJson)); + assertThat(tags.get(0).getValue()).contains("tag"); + } + + private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode mode, int buckets) + throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + CoreOptions.ChangelogProducer changelogProducer; + if (random.nextBoolean()) { + changelogProducer = CoreOptions.ChangelogProducer.INPUT; + } else { + changelogProducer = CoreOptions.ChangelogProducer.NONE; + } + + RowType rowType, partitionType; + switch (mode) { + case NON_PARTITIONED: + rowType = TestKeyValueGenerator.NON_PARTITIONED_ROW_TYPE; + partitionType = TestKeyValueGenerator.NON_PARTITIONED_PART_TYPE; + break; + case SINGLE_PARTITIONED: + rowType = TestKeyValueGenerator.SINGLE_PARTITIONED_ROW_TYPE; + partitionType = TestKeyValueGenerator.SINGLE_PARTITIONED_PART_TYPE; + break; + case MULTI_PARTITIONED: + rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE; + partitionType = TestKeyValueGenerator.DEFAULT_PART_TYPE; + break; + default: + throw new UnsupportedOperationException("Unsupported generator mode: " + mode); + } + + SchemaManager schemaManager = new SchemaManager(fileIO, new Path(root)); + TableSchema tableSchema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + partitionType.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys(mode), + Collections.emptyMap(), + null)); + + return new TestFileStore.Builder( + "avro", + root, + buckets, + partitionType, + TestKeyValueGenerator.KEY_TYPE, + rowType, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .changelogProducer(changelogProducer) + .build(); + } + + private void writeData( + TestFileStore store, + List kvs, + BinaryRow partition, + int bucket, + Map>> writers) + throws Exception { + writers.computeIfAbsent(partition, p -> new HashMap<>()) + .put(bucket, FileStoreTestUtils.writeData(store, kvs, partition, bucket)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 0e500c3e7488..cfc9b558b40c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -18,13 +18,17 @@ package org.apache.paimon.flink.action; +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.Map; /** Create tag action for Flink. */ public class CreateTagAction extends TableActionBase { private final String tagName; - private final Long snapshotId; + private final @Nullable Long snapshotId; + private final @Nullable Duration timeRetained; public CreateTagAction( String warehouse, @@ -32,18 +36,20 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, - Long snapshotId) { + @Nullable Long snapshotId, + @Nullable Duration timeRetained) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; + this.timeRetained = timeRetained; this.snapshotId = snapshotId; } @Override public void run() throws Exception { if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, snapshotId, timeRetained); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index 01bbdba42abf..7769fa1d792f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.utils.TimeUtils; + import org.apache.flink.api.java.tuple.Tuple3; +import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -30,6 +33,7 @@ public class CreateTagActionFactory implements ActionFactory { private static final String TAG_NAME = "tag_name"; private static final String SNAPSHOT = "snapshot"; + private static final String TIME_RETAINED = "time_retained"; @Override public String identifier() { @@ -49,9 +53,20 @@ public Optional create(MultipleParameterToolAdapter params) { snapshot = Long.parseLong(params.get(SNAPSHOT)); } + Duration timeRetained = null; + if (params.has(TIME_RETAINED)) { + timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED)); + } + CreateTagAction action = new CreateTagAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, tagName, snapshot); + tablePath.f0, + tablePath.f1, + tablePath.f2, + catalogConfig, + tagName, + snapshot, + timeRetained); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9999fdf11776..1a7b03ef6512 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -21,16 +21,19 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.procedure.ProcedureContext; import javax.annotation.Nullable; +import java.time.Duration; + /** * Create tag procedure. Usage: * *

- *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
  * 
*/ public class CreateTagProcedure extends ProcedureBase { @@ -40,25 +43,54 @@ public class CreateTagProcedure extends ProcedureBase { public String[] call( ProcedureContext procedureContext, String tableId, String tagName, long snapshotId) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, snapshotId); + return innerCall(tableId, tagName, snapshotId, null); } public String[] call(ProcedureContext procedureContext, String tableId, String tagName) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, null); + return innerCall(tableId, tagName, null, null); } - private String[] innerCall(String tableId, String tagName, @Nullable Long snapshotId) + public String[] call( + ProcedureContext procedureContext, + String tableId, + String tagName, + long snapshotId, + String timeRetained) + throws Catalog.TableNotExistException { + return innerCall(tableId, tagName, snapshotId, timeRetained); + } + + public String[] call( + ProcedureContext procedureContext, String tableId, String tagName, String timeRetained) + throws Catalog.TableNotExistException { + return innerCall(tableId, tagName, null, timeRetained); + } + + private String[] innerCall( + String tableId, + String tagName, + @Nullable Long snapshotId, + @Nullable String timeRetained) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, toDuration(timeRetained)); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, snapshotId, toDuration(timeRetained)); } return new String[] {"Success"}; } + @Nullable + private static Duration toDuration(@Nullable String s) { + if (s == null) { + return null; + } + + return TimeUtils.parseDuration(s); + } + @Override public String identifier() { return IDENTIFIER; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index dcf2c8b0045c..6d27c6019483 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; @@ -76,6 +77,8 @@ public class AutoTagForSavepointCommitterOperator private final NavigableSet identifiersForTags; + private final Duration tagTimeRetained; + private transient SnapshotManager snapshotManager; private transient TagManager tagManager; @@ -91,13 +94,15 @@ public AutoTagForSavepointCommitterOperator( SerializableSupplier snapshotManagerFactory, SerializableSupplier tagManagerFactory, SerializableSupplier tagDeletionFactory, - SerializableSupplier> callbacksSupplier) { + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; } @Override @@ -167,7 +172,7 @@ private void createTagForIdentifiers(List identifiers) { for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 2c898831ec2c..23202b45077f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -118,7 +118,11 @@ private void createTag() { tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } // Create a new tag - tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); + tagManager.createTag( + snapshot, + tagName, + table.coreOptions().tagDefaultTimeRetained(), + table.store().createTagCallbacks()); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f7ebf5afdf24..fa4526897bcd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -238,7 +238,8 @@ protected DataStreamSink doCommit(DataStream written, String com table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.coreOptions().tagDefaultTimeRetained()); } if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 8d445ab95b07..7a6361657f59 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -36,6 +36,7 @@ /** IT cases for branch management actions. */ class BranchActionITCase extends ActionITCaseBase { + @Test void testCreateAndDeleteBranch() throws Exception { @@ -63,7 +64,8 @@ void testCreateAndDeleteBranch() throws Exception { TagManager tagManager = new TagManager(table.fileIO(), table.location()); callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', 2, '5 d')", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); BranchManager branchManager = table.branchManager(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index a01849ed94ca..1fda583d2de3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -153,7 +153,7 @@ public void testCreateLatestTag() throws Exception { .run(); } else { callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 0dbde05795fd..3b58c24d16b1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -208,7 +208,8 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.store().options().tagDefaultTimeRetained()); } @Override @@ -224,6 +225,7 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.store().options().tagDefaultTimeRetained()); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index 7f9bdb62108b..b3f863c5e305 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.TimeUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.Duration; + import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,7 +40,8 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), - ProcedureParameter.optional("snapshot", LongType) + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("time_retained", StringType) }; private static final StructType OUTPUT_TYPE = @@ -64,14 +69,16 @@ public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String tag = args.getString(1); Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + Duration timeRetained = + args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3)); return modifyPaimonTable( tableIdent, table -> { if (snapshot == null) { - table.createTag(tag); + table.createTag(tag, timeRetained); } else { - table.createTag(tag, snapshot); + table.createTag(tag, snapshot, timeRetained); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index e505ae110033..3c9531a8a7d3 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -70,7 +70,8 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) checkAnswer( spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), + "CALL paimon.sys.create_tag(" + + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 2)"), Row(true) :: Nil) checkAnswer( spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),