diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index fc499f607173e..26b657ebbd46b 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID. --database \ --table \ --tag_name \ + --time_retained \ [--snapshot ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -126,6 +127,7 @@ public class CreateTag { public static void main(String[] args) { Table table = ...; table.createTag("my-tag", 1); + table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1); } } ``` @@ -138,6 +140,11 @@ Run the following sql: CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2); ``` +To create a tag with retained 1 day, run the following sql: +```sql +CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2); +``` + To create a tag based on the latest snapshot id, run the following sql: ```sql CALL create_tag(table => 'test.t', tag => 'test_tag'); diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 5959b6b4f0961..ec783ca294b2a 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -672,6 +672,12 @@

Enum

The date format for tag periods.

Possible values:
  • "with_dashes": Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'
  • "without_dashes": Dates and hours without dashes, e.g., 'yyyyMMdd HH'
+ +
tag.time-retained
+ 3153600000000 ms + Duration + The maximum time retained for all tags. +
target-file-size
128 mb diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index cbe5f45c5b508..e88e93d7caf2c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -70,6 +70,8 @@ public class CoreOptions implements Serializable { public static final String DISTINCT = "distinct"; + public static final Duration DEFAULT_TAG_TIME_RETAINED = Duration.ofDays(36500); + public static final ConfigOption BUCKET = key("bucket") .intType() @@ -989,6 +991,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The maximum number of tags to retain."); + public static final ConfigOption TAG_TIME_RETAINED = + key("tag.time-retained") + .durationType() + .defaultValue(DEFAULT_TAG_TIME_RETAINED) + .withDescription("The maximum time retained for all tags."); + public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") .durationType() @@ -1617,6 +1625,10 @@ public Integer tagNumRetainedMax() { return options.get(TAG_NUM_RETAINED_MAX); } + public Duration tagTimeRetained() { + return options.get(TAG_TIME_RETAINED); + } + public Duration snapshotWatermarkIdleTimeout() { return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 87cc4e65c5442..b7830f0ace515 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -41,7 +41,7 @@ import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.sink.CallbackUtils; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; @@ -249,8 +249,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { @Override @Nullable - public TagAutoCreation newTagCreationManager() { - return TagAutoCreation.create( + public TagAutoManager newTagCreationManager() { + return TagAutoManager.create( options, snapshotManager(), newTagManager(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index cd38d20611fcf..ebc0eee96f216 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -33,7 +33,7 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; @@ -93,7 +93,7 @@ public interface FileStore extends Serializable { PartitionExpire newPartitionExpire(String commitUser); @Nullable - TagAutoCreation newTagCreationManager(); + TagAutoManager newTagCreationManager(); ServiceManager newServiceManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 33ea8f4bb7cab..14c2bdb2a637e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -29,6 +29,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; @@ -64,6 +65,7 @@ * there is no compatibility issue. * */ +@JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot { public static final long FIRST_SNAPSHOT_ID = 1; @@ -93,37 +95,37 @@ public class Snapshot { // null for paimon <= 0.2 @JsonProperty(FIELD_VERSION) @Nullable - private final Integer version; + protected final Integer version; @JsonProperty(FIELD_ID) - private final long id; + protected final long id; @JsonProperty(FIELD_SCHEMA_ID) - private final long schemaId; + protected final long schemaId; // a manifest list recording all changes from the previous snapshots @JsonProperty(FIELD_BASE_MANIFEST_LIST) - private final String baseManifestList; + protected final String baseManifestList; // a manifest list recording all new changes occurred in this snapshot // for faster expire and streaming reads @JsonProperty(FIELD_DELTA_MANIFEST_LIST) - private final String deltaManifestList; + protected final String deltaManifestList; // a manifest list recording all changelog produced in this snapshot // null if no changelog is produced, or for paimon <= 0.2 @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable - private final String changelogManifestList; + protected final String changelogManifestList; // a manifest recording all index files of this table // null if no index file @JsonProperty(FIELD_INDEX_MANIFEST) @JsonInclude(JsonInclude.Include.NON_NULL) - private final String indexManifest; + protected final String indexManifest; @JsonProperty(FIELD_COMMIT_USER) - private final String commitUser; + protected final String commitUser; // Mainly for snapshot deduplication. // @@ -133,34 +135,34 @@ public class Snapshot { // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be // committed before snapshot B, and thus snapshot A must contain older records than snapshot B. @JsonProperty(FIELD_COMMIT_IDENTIFIER) - private final long commitIdentifier; + protected final long commitIdentifier; @JsonProperty(FIELD_COMMIT_KIND) - private final CommitKind commitKind; + protected final CommitKind commitKind; @JsonProperty(FIELD_TIME_MILLIS) - private final long timeMillis; + protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) - private final Map logOffsets; + protected final Map logOffsets; // record count of all changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable - private final Long totalRecordCount; + protected final Long totalRecordCount; // record count of all new changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable - private final Long deltaRecordCount; + protected final Long deltaRecordCount; // record count of all changelog produced in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable - private final Long changelogRecordCount; + protected final Long changelogRecordCount; // watermark for input records // null for paimon <= 0.3 @@ -168,14 +170,14 @@ public class Snapshot { // watermark @JsonProperty(FIELD_WATERMARK) @Nullable - private final Long watermark; + protected final Long watermark; // stats file name for statistics of this table // null if no stats file @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_STATISTICS) @Nullable - private final String statistics; + protected final String statistics; public Snapshot( long id, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 99ad605df8ef8..5b69324bf7ff1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -59,6 +59,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -443,8 +444,7 @@ public void rollbackTo(long snapshotId) { rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId)); } - @Override - public void createTag(String tagName, long fromSnapshotId) { + public Snapshot createTagInternal(long fromSnapshotId) { SnapshotManager snapshotManager = snapshotManager(); Snapshot snapshot = null; if (snapshotManager.snapshotExists(fromSnapshotId)) { @@ -464,18 +464,35 @@ public void createTag(String tagName, long fromSnapshotId) { snapshot != null, "Cannot create tag because given snapshot #%s doesn't exist.", fromSnapshotId); - createTag(tagName, snapshot); + return snapshot; + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + createTag(tagName, coreOptions().tagTimeRetained(), createTagInternal(fromSnapshotId)); + } + + @Override + public void createTag(String tagName, Duration timeRetained, long fromSnapshotId) { + createTag(tagName, timeRetained, createTagInternal(fromSnapshotId)); } @Override public void createTag(String tagName) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, latestSnapshot); + createTag(tagName, coreOptions().tagTimeRetained(), latestSnapshot); + } + + @Override + public void createTag(String tagName, Duration timeRetained) { + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); + createTag(tagName, timeRetained, latestSnapshot); } - private void createTag(String tagName, Snapshot fromSnapshot) { - tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks()); + private void createTag(String tagName, Duration timeRetained, Snapshot fromSnapshot) { + tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index d07035f2651e3..2119137ee8e2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.annotation.Experimental; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -25,6 +26,7 @@ import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.InnerStreamTableScan; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -109,6 +111,14 @@ default void createTag(String tagName, long fromSnapshotId) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, Duration timeRetained, long fromSnapshotId) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void createTag(String tagName) { throw new UnsupportedOperationException( @@ -117,6 +127,14 @@ default void createTag(String tagName) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3ed6a1990e5fe..ae0be3115b6f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.RowType; import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,10 +77,16 @@ public interface Table extends Serializable { @Experimental void createTag(String tagName, long fromSnapshotId); + @Experimental + void createTag(String tagName, Duration timeRetained, long fromSnapshotId); + /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); + @Experimental + void createTag(String tagName, Duration timeRetained); + /** Delete a tag by name. */ @Experimental void deleteTag(String tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index c63511ac46522..9d471ee5761f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -31,7 +31,7 @@ import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.IOUtils; @@ -76,7 +76,7 @@ public class TableCommitImpl implements InnerTableCommit { private final List commitCallbacks; @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; - @Nullable private final TagAutoCreation tagAutoCreation; + @Nullable private final TagAutoManager tagAutoManager; private final Lock lock; @Nullable private final Duration consumerExpireTime; @@ -96,7 +96,7 @@ public TableCommitImpl( List commitCallbacks, @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, - @Nullable TagAutoCreation tagAutoCreation, + @Nullable TagAutoManager tagAutoManager, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, @@ -112,7 +112,7 @@ public TableCommitImpl( this.commitCallbacks = commitCallbacks; this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; - this.tagAutoCreation = tagAutoCreation; + this.tagAutoManager = tagAutoManager; this.lock = lock; this.consumerExpireTime = consumerExpireTime; @@ -131,8 +131,14 @@ public TableCommitImpl( } public boolean forceCreatingSnapshot() { - return this.forceCreatingSnapshot - || (tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot()); + if (this.forceCreatingSnapshot) { + return true; + } + if (tagAutoManager != null) { + return tagAutoManager.getTagAutoCreation() != null + && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot(); + } + return false; } @Override @@ -349,8 +355,8 @@ private void expire(long partitionExpireIdentifier) { partitionExpire.expire(partitionExpireIdentifier); } - if (tagAutoCreation != null) { - tagAutoCreation.run(); + if (tagAutoManager != null) { + tagAutoManager.run(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index ebd8aa7af0db9..fd41ba26b0098 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -63,6 +64,7 @@ import java.util.SortedMap; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** A {@link Table} for showing tags of table. */ public class TagsTable implements ReadonlyTable { @@ -79,7 +81,10 @@ public class TagsTable implements ReadonlyTable { new DataField(2, "schema_id", new BigIntType(false)), new DataField(3, "commit_time", new TimestampType(false, 3)), new DataField(4, "record_count", new BigIntType(true)), - new DataField(5, "branches", SerializationUtils.newStringType(true)))); + new DataField(5, "branches", SerializationUtils.newStringType(true)), + new DataField(6, "create_time", new TimestampType(false, 3)), + new DataField( + 7, "time_retained", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -234,17 +239,21 @@ public RecordReader createReader(Split split) { } private InternalRow toRow( - Map.Entry tag, Map> tagBranches) { - Snapshot snapshot = tag.getValue(); - List branches = tagBranches.get(tag.getKey()); + Map.Entry snapshot, Map> tagBranches) { + checkArgument( + snapshot.getValue() instanceof Tag, + "There is a bug, Snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) snapshot.getValue(); + List branches = tagBranches.get(snapshot.getKey()); return GenericRow.of( - BinaryString.fromString(tag.getKey()), - snapshot.id(), - snapshot.schemaId(), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), - BinaryString.fromString(branches == null ? "[]" : branches.toString())); + BinaryString.fromString(snapshot.getKey()), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(branches == null ? "[]" : branches.toString()), + Timestamp.fromLocalDateTime(tag.getTagCreateTime()), + BinaryString.fromString(tag.getTagTimeRetained().toString())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java new file mode 100644 index 0000000000000..520b608afd362 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; + +/** Snapshot with tagCreateTime and tagTimeRetained. */ +public class Tag extends Snapshot { + + private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime"; + private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained"; + private static final LocalDateTime DEFAULT_TAG_CREATE_TIME = + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 0); + + @JsonProperty(FIELD_TAG_CREATE_TIME) + @Nullable + private final LocalDateTime tagCreateTime; + + @JsonProperty(FIELD_TAG_TIME_RETAINED) + @Nullable + private final Duration tagTimeRetained; + + @JsonCreator + public Tag( + @JsonProperty(FIELD_VERSION) @Nullable Integer version, + @JsonProperty(FIELD_ID) long id, + @JsonProperty(FIELD_SCHEMA_ID) long schemaId, + @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, + @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, + @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList, + @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest, + @JsonProperty(FIELD_COMMIT_USER) String commitUser, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, + @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets, + @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount, + @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, + @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, + @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, + @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { + super( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + this.tagCreateTime = tagCreateTime == null ? DEFAULT_TAG_CREATE_TIME : tagCreateTime; + this.tagTimeRetained = + tagTimeRetained == null ? DEFAULT_TAG_TIME_RETAINED : tagTimeRetained; + } + + @JsonGetter(FIELD_TAG_CREATE_TIME) + public LocalDateTime getTagCreateTime() { + return tagCreateTime; + } + + @JsonGetter(FIELD_TAG_TIME_RETAINED) + public Duration getTagTimeRetained() { + return tagTimeRetained; + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return Tag.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read tag from path " + path, e); + } + } + + public static Optional safelyFromTagPath(FileIO fileIO, Path path) throws IOException { + try { + String json = fileIO.readFileUtf8(path); + return Optional.of(Tag.fromJson(json)); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + } + + public static Tag fromSnapshotAndTagTtl( + Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { + return new Tag( + snapshot.version(), + snapshot.id(), + snapshot.schemaId(), + snapshot.baseManifestList(), + snapshot.deltaManifestList(), + snapshot.changelogManifestList(), + snapshot.indexManifest(), + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.totalRecordCount(), + snapshot.deltaRecordCount(), + snapshot.changelogRecordCount(), + snapshot.watermark(), + snapshot.statistics(), + tagCreateTime, + tagTimeRetained); + } + + public Snapshot toSnapshot() { + return new Snapshot( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), tagCreateTime, tagTimeRetained); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + Tag that = (Tag) o; + return Objects.equals(tagCreateTime, that.tagCreateTime) + && Objects.equals(tagTimeRetained, that.tagTimeRetained); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 505454313bd20..a3542e494f314 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; -import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor; import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor; @@ -46,11 +45,10 @@ public class TagAutoCreation { private final SnapshotManager snapshotManager; private final TagManager tagManager; - private final TagDeletion tagDeletion; private final TagTimeExtractor timeExtractor; private final TagPeriodHandler periodHandler; private final Duration delay; - private final Integer numRetainedMax; + private final Duration timeRetained; private final List callbacks; private final Duration idlenessTimeout; @@ -60,20 +58,18 @@ public class TagAutoCreation { private TagAutoCreation( SnapshotManager snapshotManager, TagManager tagManager, - TagDeletion tagDeletion, TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, Duration delay, - Integer numRetainedMax, + Duration timeRetained, Duration idlenessTimeout, List callbacks) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; - this.tagDeletion = tagDeletion; this.timeExtractor = timeExtractor; this.periodHandler = periodHandler; this.delay = delay; - this.numRetainedMax = numRetainedMax; + this.timeRetained = timeRetained; this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; @@ -118,7 +114,7 @@ public boolean forceCreatingSnapshot() { public void run() { while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { - tryToTag(snapshotManager.snapshot(nextSnapshot)); + tryToCreateTags(snapshotManager.snapshot(nextSnapshot)); nextSnapshot++; } else { // avoid snapshot has been expired @@ -132,7 +128,7 @@ public void run() { } } - private void tryToTag(Snapshot snapshot) { + private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); if (!timeOptional.isPresent()) { @@ -144,28 +140,8 @@ private void tryToTag(Snapshot snapshot) { || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, timeRetained, callbacks); nextTag = periodHandler.nextTagTime(thisTag); - - if (numRetainedMax != null) { - // only handle auto-created tags here - SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); - if (tags.size() > numRetainedMax) { - int toDelete = tags.size() - numRetainedMax; - int i = 0; - for (List tag : tags.values()) { - tagManager.deleteTag( - checkAndGetOneAutoTag(tag), - tagDeletion, - snapshotManager, - callbacks); - i++; - if (i == toDelete) { - break; - } - } - } - } } } @@ -187,7 +163,6 @@ public static TagAutoCreation create( CoreOptions options, SnapshotManager snapshotManager, TagManager tagManager, - TagDeletion tagDeletion, List callbacks) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); if (extractor == null) { @@ -196,11 +171,10 @@ public static TagAutoCreation create( return new TagAutoCreation( snapshotManager, tagManager, - tagDeletion, extractor, TagPeriodHandler.create(options), options.tagCreationDelay(), - options.tagNumRetainedMax(), + options.tagTimeRetained(), options.snapshotWatermarkIdleTimeout(), callbacks); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java new file mode 100644 index 0000000000000..8af934b5b6de9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** A manager to expire tags. */ +public class TagAutoExpire { + + private static final Logger LOG = LoggerFactory.getLogger(TagAutoExpire.class); + + private final SnapshotManager snapshotManager; + private final TagManager tagManager; + private final TagDeletion tagDeletion; + private final TagPeriodHandler periodHandler; + private final Integer numRetainedMax; + private final List callbacks; + + private TagAutoExpire( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + TagPeriodHandler periodHandler, + Duration delay, + Integer numRetainedMax, + List callbacks) { + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.tagDeletion = tagDeletion; + this.periodHandler = periodHandler; + this.numRetainedMax = numRetainedMax; + this.callbacks = callbacks; + this.periodHandler.validateDelay(delay); + } + + public void run() { + Set deleteTags = new HashSet<>(); + deleteTags.addAll(getExpireTagsByNumRetainedMax()); + deleteTags.addAll(getExpireTagsByTimeRetained()); + deleteTags.forEach( + tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, callbacks)); + } + + private Set getExpireTagsByNumRetainedMax() { + Set deleteTags = new HashSet<>(); + if (numRetainedMax != null) { + // only handle auto-created tags here + SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); + if (tags.size() > numRetainedMax) { + int toDelete = tags.size() - numRetainedMax; + int i = 0; + for (List tag : tags.values()) { + String tagName = TagAutoCreation.checkAndGetOneAutoTag(tag); + LOG.info( + "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", + tagName, + numRetainedMax); + deleteTags.add(tagName); + i++; + if (i == toDelete) { + break; + } + } + } + } + return deleteTags; + } + + private Set getExpireTagsByTimeRetained() { + // handle auto-created and non-auto-created-tags here + Set deleteTags = new HashSet<>(); + SortedMap> tags = tagManager.tags(); + for (Map.Entry> entry : tags.entrySet()) { + checkArgument( + entry.getKey() instanceof Tag, + "There is a bug, snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) entry.getKey(); + LocalDateTime createTime = tag.getTagCreateTime(); + Duration timeRetained = tag.getTagTimeRetained(); + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + for (String tagName : entry.getValue()) { + LOG.info( + "Delete tag {}, because its existence time has reached its timeRetained of {}.", + tagName, + timeRetained); + deleteTags.add(tagName); + } + } + } + return deleteTags; + } + + public static TagAutoExpire create( + CoreOptions options, + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + return new TagAutoExpire( + snapshotManager, + tagManager, + tagDeletion, + TagPeriodHandler.create(options), + options.tagCreationDelay(), + options.tagNumRetainedMax(), + callbacks); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java new file mode 100644 index 0000000000000..a991559479a9f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.util.List; + +/** A manager to create and expire tags. */ +public class TagAutoManager { + + private final TagAutoCreation tagAutoCreation; + private final TagAutoExpire tagAutoExpire; + + private TagAutoManager(TagAutoCreation tagAutoCreation, TagAutoExpire tagAutoExpire) { + this.tagAutoCreation = tagAutoCreation; + this.tagAutoExpire = tagAutoExpire; + } + + public void run() { + if (tagAutoCreation != null) { + tagAutoCreation.run(); + } + if (tagAutoExpire != null) { + tagAutoExpire.run(); + } + } + + public static TagAutoManager create( + CoreOptions options, + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); + + return new TagAutoManager( + extractor == null + ? null + : TagAutoCreation.create(options, snapshotManager, tagManager, callbacks), + TagAutoExpire.create(options, snapshotManager, tagManager, tagDeletion, callbacks)); + } + + public TagAutoCreation getTagAutoCreation() { + return tagAutoCreation; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 676276a30e58b..aa493dd4a90b3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -36,6 +36,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import javax.annotation.Nullable; @@ -57,6 +58,7 @@ public class JsonSerdeUtil { static { OBJECT_MAPPER_INSTANCE = new ObjectMapper(); OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule()); + OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule()); } public static LinkedHashMap parseJsonMap(String jsonString, Class valueType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 134dea459f02a..c23ad9a6bdb6f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,11 +26,14 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -75,7 +78,8 @@ public Path branchTagPath(String branchName, String tagName) { } /** Create a tag from given snapshot and save it in the storage. */ - public void createTag(Snapshot snapshot, String tagName, List callbacks) { + public void createTag( + Snapshot snapshot, String tagName, Duration timeRetained, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); // skip create tag for the same snapshot of the same name. @@ -86,7 +90,10 @@ public void createTag(Snapshot snapshot, String tagName, List callb } else { Path newTagPath = tagPath(tagName); try { - fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); + fileIO.writeFileUtf8( + newTagPath, + Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now()) + .toJson()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -227,7 +234,7 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Snapshot.fromPath(fileIO, tagPath(tagName)); + return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot(); } public long tagCount() { @@ -276,10 +283,10 @@ public SortedMap> tags(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Snapshot.safelyFromPath(fileIO, path) + Tag.safelyFromTagPath(fileIO, path) .ifPresent( - snapshot -> - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()) + tag -> + tags.computeIfAbsent(tag, s -> new ArrayList<>()) .add(tagName)); } } catch (IOException e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index c8c7be7e12e3e..492f3c5c753f5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.assertj.core.api.Assertions.assertThat; @@ -140,7 +141,8 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { // create tags for each snapshot for (int id = 1; id <= latestSnapshotId; id++) { Snapshot snapshot = snapshotManager.snapshot(id); - tagManager.createTag(snapshot, "tag" + id, Collections.emptyList()); + tagManager.createTag( + snapshot, "tag" + id, DEFAULT_TAG_TIME_RETAINED, Collections.emptyList()); } // randomly expire snapshots diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 819b70d8a5702..13dfa00d023ff 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -60,6 +60,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck; import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists; import static org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists; @@ -736,6 +737,6 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { } private void createTag(Snapshot snapshot, String tagName) { - tagManager.createTag(snapshot, tagName, Collections.emptyList()); + tagManager.createTag(snapshot, tagName, DEFAULT_TAG_TIME_RETAINED, Collections.emptyList()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 51cfcd702f8c7..d38197800ccf5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.TagManager; @@ -45,6 +46,7 @@ import java.util.Map; import java.util.function.Function; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for {@link TagsTable}. */ @@ -120,19 +122,24 @@ void testTagBranchesTable() throws Exception { private List getExceptedResult( Function> tagBranchesFunction) { List internalRows = new ArrayList<>(); - for (Map.Entry> tag : tagManager.tags().entrySet()) { - Snapshot snapshot = tag.getKey(); - for (String tagName : tag.getValue()) { + for (Map.Entry> snapshot : tagManager.tags().entrySet()) { + checkArgument( + snapshot.getKey() instanceof Tag, + "There is a bug, Snapshot in tagManager.tags() must be Tag."); + Tag tag = (Tag) snapshot.getKey(); + for (String tagName : snapshot.getValue()) { internalRows.add( GenericRow.of( BinaryString.fromString(tagName), - snapshot.id(), - snapshot.schemaId(), + tag.id(), + tag.schemaId(), Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), + DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), BinaryString.fromString( - tagBranchesFunction.apply(tagName).toString()))); + tagBranchesFunction.apply(tagName).toString()), + Timestamp.fromLocalDateTime(tag.getTagCreateTime()), + BinaryString.fromString(tag.getTagTimeRetained().toString()))); } } return internalRows; diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java similarity index 91% rename from paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java rename to paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index f76a58e6cfd18..97c899bc2b66d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -43,10 +43,11 @@ import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD; import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER; +import static org.apache.paimon.CoreOptions.TAG_TIME_RETAINED; import static org.assertj.core.api.Assertions.assertThat; /** Test for tag automatic creation. */ -public class TagAutoCreationTest extends PrimaryKeyTableTestBase { +public class TagAutoManagerTest extends PrimaryKeyTableTestBase { @Test public void testTag() throws Exception { @@ -320,6 +321,31 @@ public void testWatermarkIdleTimeoutForceCreatingSnapshot() throws Exception { commit.close(); } + @Test + public void testExpireTagsByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(4, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 18"); + + commit.close(); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault()) diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java new file mode 100644 index 0000000000000..5b9af3604292e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDateTime; + +/** Test for {@link Tag}. */ +public class TagTest { + + private final Snapshot snapshot = + new Snapshot( + 0, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + + @Test + public void testFromJson() { + Tag tag = Tag.fromJson(snapshot.toJson()); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"logOffsets\" : null,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"changelogRecordCount\" : null,\n" + + " \"watermark\" : null,\n" + + " \"tagCreateTime\" : [ 1970, 1, 1, 0, 0 ],\n" + + " \"tagTimeRetained\" : 3153600000.000000000\n" + + "}", + tag.toJson()); + } + + @Test + public void testFromSnapshotAndTagTtl() { + Tag tag = + Tag.fromSnapshotAndTagTtl( + snapshot, + Duration.ofSeconds(5), + LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"logOffsets\" : null,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"changelogRecordCount\" : null,\n" + + " \"watermark\" : null,\n" + + " \"tagCreateTime\" : [ 1969, 1, 1, 0, 0, 0, 123456789 ],\n" + + " \"tagTimeRetained\" : 5.000000000\n" + + "}", + tag.toJson()); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 0e500c3e74883..5928783c34a3c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -18,12 +18,14 @@ package org.apache.paimon.flink.action; +import java.time.Duration; import java.util.Map; /** Create tag action for Flink. */ public class CreateTagAction extends TableActionBase { private final String tagName; + private final Duration timeRetained; private final Long snapshotId; public CreateTagAction( @@ -32,18 +34,20 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, + Duration timeRetained, Long snapshotId) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; + this.timeRetained = timeRetained; this.snapshotId = snapshotId; } @Override public void run() throws Exception { if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, timeRetained, snapshotId); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index 01bbdba42abff..2df28dbdde3b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -18,11 +18,16 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.utils.TimeUtils; + import org.apache.flink.api.java.tuple.Tuple3; +import java.time.Duration; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; + /** Factory to create {@link CreateTagAction}. */ public class CreateTagActionFactory implements ActionFactory { @@ -30,6 +35,7 @@ public class CreateTagActionFactory implements ActionFactory { private static final String TAG_NAME = "tag_name"; private static final String SNAPSHOT = "snapshot"; + private static final String TIME_RETAINED = "time_retained"; @Override public String identifier() { @@ -49,9 +55,20 @@ public Optional create(MultipleParameterToolAdapter params) { snapshot = Long.parseLong(params.get(SNAPSHOT)); } + Duration timeRetained = DEFAULT_TAG_TIME_RETAINED; + if (params.has(TIME_RETAINED)) { + timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED)); + } + CreateTagAction action = new CreateTagAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, tagName, snapshot); + tablePath.f0, + tablePath.f1, + tablePath.f2, + catalogConfig, + tagName, + timeRetained, + snapshot); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9999fdf117761..3bdc2e07edc30 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -30,7 +31,7 @@ * Create tag procedure. Usage: * *

- *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', 'timeRetained', snapshotId)
  * 
*/ public class CreateTagProcedure extends ProcedureBase { @@ -38,23 +39,29 @@ public class CreateTagProcedure extends ProcedureBase { public static final String IDENTIFIER = "create_tag"; public String[] call( - ProcedureContext procedureContext, String tableId, String tagName, long snapshotId) + ProcedureContext procedureContext, + String tableId, + String tagName, + String timeRetained, + long snapshotId) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, snapshotId); + return innerCall(tableId, tagName, timeRetained, snapshotId); } - public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + public String[] call( + ProcedureContext procedureContext, String tableId, String tagName, String timeRetained) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, null); + return innerCall(tableId, tagName, timeRetained, null); } - private String[] innerCall(String tableId, String tagName, @Nullable Long snapshotId) + private String[] innerCall( + String tableId, String tagName, String timeRetained, @Nullable Long snapshotId) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, TimeUtils.parseDuration(timeRetained)); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, TimeUtils.parseDuration(timeRetained), snapshotId); } return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index dcf2c8b0045cd..6d27c60194837 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; @@ -76,6 +77,8 @@ public class AutoTagForSavepointCommitterOperator private final NavigableSet identifiersForTags; + private final Duration tagTimeRetained; + private transient SnapshotManager snapshotManager; private transient TagManager tagManager; @@ -91,13 +94,15 @@ public AutoTagForSavepointCommitterOperator( SerializableSupplier snapshotManagerFactory, SerializableSupplier tagManagerFactory, SerializableSupplier tagDeletionFactory, - SerializableSupplier> callbacksSupplier) { + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; } @Override @@ -167,7 +172,7 @@ private void createTagForIdentifiers(List identifiers) { for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 2c898831ec2ce..93eac0431be64 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -118,7 +118,11 @@ private void createTag() { tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } // Create a new tag - tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); + tagManager.createTag( + snapshot, + tagName, + table.coreOptions().tagTimeRetained(), + table.store().createTagCallbacks()); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 97c426ee56854..3a4ddcb5dac1f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -242,7 +242,8 @@ protected DataStreamSink doCommit(DataStream written, String com table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.coreOptions().tagTimeRetained()); } if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 8d445ab95b07e..eb6f70f655d30 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -36,6 +36,7 @@ /** IT cases for branch management actions. */ class BranchActionITCase extends ActionITCaseBase { + @Test void testCreateAndDeleteBranch() throws Exception { @@ -63,7 +64,8 @@ void testCreateAndDeleteBranch() throws Exception { TagManager tagManager = new TagManager(table.fileIO(), table.location()); callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); BranchManager branchManager = table.branchManager(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index a01849ed94caa..a650ad31b3478 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -83,7 +83,8 @@ public void testCreateAndDeleteTag() throws Exception { .run(); } else { callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); @@ -153,7 +154,8 @@ public void testCreateLatestTag() throws Exception { .run(); } else { callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 0dbde05795fd3..4090444583207 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -208,7 +209,8 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + DEFAULT_TAG_TIME_RETAINED); } @Override @@ -224,6 +226,7 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + DEFAULT_TAG_TIME_RETAINED); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 4d9753babc349..88878c2c71cfa 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -913,7 +913,7 @@ public void testAddPartitionsForTag() throws Exception { " 'metastore.tag-to-partition' = 'dt'", ")")); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16"); @@ -927,7 +927,7 @@ public void testAddPartitionsForTag() throws Exception { // another tag tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); assertThat(hiveShell.executeQuery("SELECT * FROM t")) .containsExactlyInAnyOrder( @@ -951,9 +951,9 @@ public void testDeletePartitionForTag() throws Exception { + " 'metastore.tag-to-partition' = 'dt'\n" + ")"); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17"); @@ -979,7 +979,7 @@ public void testHistoryPartitionsCascadeToUpdate() throws Exception { " 'metastore.tag-to-partition' = 'dt'", ")")); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16"); @@ -991,7 +991,7 @@ public void testHistoryPartitionsCascadeToUpdate() throws Exception { .containsExactlyInAnyOrder("1\t10\t2023-10-16", "2\t20\t2023-10-16"); tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); tEnv.executeSql("ALTER TABLE t ADD z INT"); tEnv.executeSql("INSERT INTO t VALUES (3, 30, 5), (4, 40, 6)").await(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index 7f9bdb62108b6..c1db61a7a00bd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.TimeUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,9 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.Duration; + +import static org.apache.paimon.CoreOptions.DEFAULT_TAG_TIME_RETAINED; import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,6 +41,7 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), + ProcedureParameter.optional("time_retained", StringType), ProcedureParameter.optional("snapshot", LongType) }; @@ -63,15 +69,19 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String tag = args.getString(1); - Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + Duration timeRetained = + args.isNullAt(2) + ? DEFAULT_TAG_TIME_RETAINED + : TimeUtils.parseDuration(args.getString(2)); + Long snapshot = args.isNullAt(3) ? null : args.getLong(3); return modifyPaimonTable( tableIdent, table -> { if (snapshot == null) { - table.createTag(tag); + table.createTag(tag, timeRetained); } else { - table.createTag(tag, snapshot); + table.createTag(tag, timeRetained, snapshot); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index e505ae110033f..3c9531a8a7d31 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -70,7 +70,8 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) checkAnswer( spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), + "CALL paimon.sys.create_tag(" + + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 2)"), Row(true) :: Nil) checkAnswer( spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),