diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index fc499f607173..26b657ebbd46 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -108,6 +108,7 @@ You can create a tag with given name and snapshot ID. --database \ --table \ --tag_name \ + --time_retained \ [--snapshot ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -126,6 +127,7 @@ public class CreateTag { public static void main(String[] args) { Table table = ...; table.createTag("my-tag", 1); + table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1); } } ``` @@ -138,6 +140,11 @@ Run the following sql: CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2); ``` +To create a tag with retained 1 day, run the following sql: +```sql +CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2); +``` + To create a tag based on the latest snapshot id, run the following sql: ```sql CALL create_tag(table => 'test.t', tag => 'test_tag'); diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 4c42bbfbbb58..f09627148903 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -678,6 +678,12 @@

Enum

What frequency is used to generate tags.

Possible values:
  • "daily": Generate a tag every day.
  • "hourly": Generate a tag every hour.
  • "two-hours": Generate a tag every two hours.
+ +
tag.default-time-retained
+ (none) + Duration + The maximum default time retained for all tags. +
tag.num-retained-max
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 37bd0c217f57..f3ce61f6d2fd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1014,6 +1014,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The maximum number of tags to retain."); + public static final ConfigOption TAG_DEFAULT_TIME_RETAINED = + key("tag.default-time-retained") + .durationType() + .noDefaultValue() + .withDescription("The maximum default time retained for all tags."); + public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") .durationType() @@ -1647,6 +1653,10 @@ public Integer tagNumRetainedMax() { return options.get(TAG_NUM_RETAINED_MAX); } + public Duration tagDefaultTimeRetained() { + return options.get(TAG_DEFAULT_TIME_RETAINED); + } + public Duration snapshotWatermarkIdleTimeout() { return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 87cc4e65c544..b7830f0ace51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -41,7 +41,7 @@ import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.sink.CallbackUtils; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; @@ -249,8 +249,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { @Override @Nullable - public TagAutoCreation newTagCreationManager() { - return TagAutoCreation.create( + public TagAutoManager newTagCreationManager() { + return TagAutoManager.create( options, snapshotManager(), newTagManager(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 6731121c567b..1d9e247d4392 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -33,7 +33,7 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; @@ -93,7 +93,7 @@ public interface FileStore extends Serializable { PartitionExpire newPartitionExpire(String commitUser); @Nullable - TagAutoCreation newTagCreationManager(); + TagAutoManager newTagCreationManager(); ServiceManager newServiceManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 0ac23ecf05dc..0c42c4e43093 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -95,37 +95,37 @@ public class Snapshot { // null for paimon <= 0.2 @JsonProperty(FIELD_VERSION) @Nullable - private final Integer version; + protected final Integer version; @JsonProperty(FIELD_ID) - private final long id; + protected final long id; @JsonProperty(FIELD_SCHEMA_ID) - private final long schemaId; + protected final long schemaId; // a manifest list recording all changes from the previous snapshots @JsonProperty(FIELD_BASE_MANIFEST_LIST) - private final String baseManifestList; + protected final String baseManifestList; // a manifest list recording all new changes occurred in this snapshot // for faster expire and streaming reads @JsonProperty(FIELD_DELTA_MANIFEST_LIST) - private final String deltaManifestList; + protected final String deltaManifestList; // a manifest list recording all changelog produced in this snapshot // null if no changelog is produced, or for paimon <= 0.2 @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable - private final String changelogManifestList; + protected final String changelogManifestList; // a manifest recording all index files of this table // null if no index file @JsonProperty(FIELD_INDEX_MANIFEST) @JsonInclude(JsonInclude.Include.NON_NULL) - private final String indexManifest; + protected final String indexManifest; @JsonProperty(FIELD_COMMIT_USER) - private final String commitUser; + protected final String commitUser; // Mainly for snapshot deduplication. // @@ -135,37 +135,37 @@ public class Snapshot { // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be // committed before snapshot B, and thus snapshot A must contain older records than snapshot B. @JsonProperty(FIELD_COMMIT_IDENTIFIER) - private final long commitIdentifier; + protected final long commitIdentifier; @JsonProperty(FIELD_COMMIT_KIND) - private final CommitKind commitKind; + protected final CommitKind commitKind; @JsonProperty(FIELD_TIME_MILLIS) - private final long timeMillis; + protected final long timeMillis; @JsonProperty(FIELD_LOG_OFFSETS) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Map logOffsets; + protected final Map logOffsets; // record count of all changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable - private final Long totalRecordCount; + protected final Long totalRecordCount; // record count of all new changes occurred in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable - private final Long deltaRecordCount; + protected final Long deltaRecordCount; // record count of all changelog produced in this snapshot // null for paimon <= 0.3 @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Long changelogRecordCount; + protected final Long changelogRecordCount; // watermark for input records // null for paimon <= 0.3 @@ -174,14 +174,14 @@ public class Snapshot { @JsonProperty(FIELD_WATERMARK) @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable - private final Long watermark; + protected final Long watermark; // stats file name for statistics of this table // null if no stats file @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty(FIELD_STATISTICS) @Nullable - private final String statistics; + protected final String statistics; public Snapshot( long id, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 83508adf083f..e389a471c4ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1231,7 +1231,7 @@ static ConflictCheck noConflictCheck() { return latestSnapshot -> false; } - static ConflictCheck mustConflictCheck() { + public static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 70e33ddb32f4..871bba4affae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -60,6 +60,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -461,8 +462,7 @@ public void rollbackTo(long snapshotId) { rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId)); } - @Override - public void createTag(String tagName, long fromSnapshotId) { + public Snapshot createTagInternal(long fromSnapshotId) { SnapshotManager snapshotManager = snapshotManager(); Snapshot snapshot = null; if (snapshotManager.snapshotExists(fromSnapshotId)) { @@ -482,18 +482,36 @@ public void createTag(String tagName, long fromSnapshotId) { snapshot != null, "Cannot create tag because given snapshot #%s doesn't exist.", fromSnapshotId); - createTag(tagName, snapshot); + return snapshot; + } + + @Override + public void createTag(String tagName, long fromSnapshotId) { + createTag( + tagName, coreOptions().tagDefaultTimeRetained(), createTagInternal(fromSnapshotId)); + } + + @Override + public void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) { + createTag(tagName, timeRetained, createTagInternal(fromSnapshotId)); } @Override public void createTag(String tagName) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, latestSnapshot); + createTag(tagName, coreOptions().tagDefaultTimeRetained(), latestSnapshot); + } + + @Override + public void createTag(String tagName, @Nullable Duration timeRetained) { + Snapshot latestSnapshot = snapshotManager().latestSnapshot(); + checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); + createTag(tagName, timeRetained, latestSnapshot); } - private void createTag(String tagName, Snapshot fromSnapshot) { - tagManager().createTag(fromSnapshot, tagName, store().createTagCallbacks()); + private void createTag(String tagName, @Nullable Duration timeRetained, Snapshot fromSnapshot) { + tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index d07035f2651e..be823156c0cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.annotation.Experimental; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -25,6 +26,9 @@ import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.InnerStreamTableScan; +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -109,6 +113,14 @@ default void createTag(String tagName, long fromSnapshotId) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void createTag(String tagName) { throw new UnsupportedOperationException( @@ -117,6 +129,14 @@ default void createTag(String tagName) { this.getClass().getSimpleName())); } + @Experimental + default void createTag(String tagName, @Nullable Duration timeRetained) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createTag.", + this.getClass().getSimpleName())); + } + @Override default void deleteTag(String tagName) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3ed6a1990e5f..3d46d25a1a93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -26,7 +26,10 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; +import javax.annotation.Nullable; + import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -76,10 +79,16 @@ public interface Table extends Serializable { @Experimental void createTag(String tagName, long fromSnapshotId); + @Experimental + void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId); + /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); + @Experimental + void createTag(String tagName, @Nullable Duration timeRetained); + /** Delete a tag by name. */ @Experimental void deleteTag(String tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index c63511ac4652..9d471ee5761f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -31,7 +31,7 @@ import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.metrics.CommitMetrics; -import org.apache.paimon.tag.TagAutoCreation; +import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.IOUtils; @@ -76,7 +76,7 @@ public class TableCommitImpl implements InnerTableCommit { private final List commitCallbacks; @Nullable private final Runnable expireSnapshots; @Nullable private final PartitionExpire partitionExpire; - @Nullable private final TagAutoCreation tagAutoCreation; + @Nullable private final TagAutoManager tagAutoManager; private final Lock lock; @Nullable private final Duration consumerExpireTime; @@ -96,7 +96,7 @@ public TableCommitImpl( List commitCallbacks, @Nullable Runnable expireSnapshots, @Nullable PartitionExpire partitionExpire, - @Nullable TagAutoCreation tagAutoCreation, + @Nullable TagAutoManager tagAutoManager, Lock lock, @Nullable Duration consumerExpireTime, ConsumerManager consumerManager, @@ -112,7 +112,7 @@ public TableCommitImpl( this.commitCallbacks = commitCallbacks; this.expireSnapshots = expireSnapshots; this.partitionExpire = partitionExpire; - this.tagAutoCreation = tagAutoCreation; + this.tagAutoManager = tagAutoManager; this.lock = lock; this.consumerExpireTime = consumerExpireTime; @@ -131,8 +131,14 @@ public TableCommitImpl( } public boolean forceCreatingSnapshot() { - return this.forceCreatingSnapshot - || (tagAutoCreation != null && tagAutoCreation.forceCreatingSnapshot()); + if (this.forceCreatingSnapshot) { + return true; + } + if (tagAutoManager != null) { + return tagAutoManager.getTagAutoCreation() != null + && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot(); + } + return false; } @Override @@ -349,8 +355,8 @@ private void expire(long partitionExpireIdentifier) { partitionExpire.expire(partitionExpireIdentifier); } - if (tagAutoCreation != null) { - tagAutoCreation.run(); + if (tagAutoManager != null) { + tagAutoManager.run(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index ebd8aa7af0db..e51ebcb0d64e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -19,7 +19,6 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -39,6 +38,7 @@ import org.apache.paimon.table.source.ReadOnceTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -51,6 +51,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -79,7 +80,10 @@ public class TagsTable implements ReadonlyTable { new DataField(2, "schema_id", new BigIntType(false)), new DataField(3, "commit_time", new TimestampType(false, 3)), new DataField(4, "record_count", new BigIntType(true)), - new DataField(5, "branches", SerializationUtils.newStringType(true)))); + new DataField(5, "branches", SerializationUtils.newStringType(true)), + new DataField(6, "create_time", new TimestampType(false, 3)), + new DataField( + 7, "time_retained", SerializationUtils.newStringType(true)))); private final FileIO fileIO; private final Path location; @@ -204,9 +208,9 @@ public RecordReader createReader(Split split) { Options options = new Options(); options.set(CoreOptions.PATH, location.toUri().toString()); FileStoreTable table = FileStoreTableFactory.create(fileIO, options); - SortedMap> tags = table.tagManager().tags(); - Map nameToSnapshot = new LinkedHashMap<>(); - for (Map.Entry> tag : tags.entrySet()) { + SortedMap> tags = table.tagManager().tagsWithTimeRetained(); + Map nameToSnapshot = new LinkedHashMap<>(); + for (Map.Entry> tag : tags.entrySet()) { for (String tagName : tag.getValue()) { nameToSnapshot.put(tagName, tag.getKey()); } @@ -234,17 +238,24 @@ public RecordReader createReader(Split split) { } private InternalRow toRow( - Map.Entry tag, Map> tagBranches) { - Snapshot snapshot = tag.getValue(); - List branches = tagBranches.get(tag.getKey()); + Map.Entry snapshot, Map> tagBranches) { + Tag tag = snapshot.getValue(); + List branches = tagBranches.get(snapshot.getKey()); return GenericRow.of( - BinaryString.fromString(tag.getKey()), - snapshot.id(), - snapshot.schemaId(), + BinaryString.fromString(snapshot.getKey()), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime(DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(branches == null ? "[]" : branches.toString()), Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), - BinaryString.fromString(branches == null ? "[]" : branches.toString())); + tag.getTagCreateTime() == null + ? LocalDateTime.MIN + : tag.getTagCreateTime()), + BinaryString.fromString( + tag.getTagTimeRetained() == null + ? "" + : tag.getTagTimeRetained().toString())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java new file mode 100644 index 000000000000..58dbf430b45f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.JsonSerdeUtil; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Comparator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Snapshot with tagCreateTime and tagTimeRetained. */ +public class Tag extends Snapshot { + + public static final Comparator TAG_COMPARATOR = new TagComparator(); + + private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime"; + private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained"; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TAG_CREATE_TIME) + @Nullable + private final LocalDateTime tagCreateTime; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty(FIELD_TAG_TIME_RETAINED) + @Nullable + private final Duration tagTimeRetained; + + @JsonCreator + public Tag( + @JsonProperty(FIELD_VERSION) @Nullable Integer version, + @JsonProperty(FIELD_ID) long id, + @JsonProperty(FIELD_SCHEMA_ID) long schemaId, + @JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList, + @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList, + @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList, + @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest, + @JsonProperty(FIELD_COMMIT_USER) String commitUser, + @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier, + @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind, + @JsonProperty(FIELD_TIME_MILLIS) long timeMillis, + @JsonProperty(FIELD_LOG_OFFSETS) Map logOffsets, + @JsonProperty(FIELD_TOTAL_RECORD_COUNT) @Nullable Long totalRecordCount, + @JsonProperty(FIELD_DELTA_RECORD_COUNT) @Nullable Long deltaRecordCount, + @JsonProperty(FIELD_CHANGELOG_RECORD_COUNT) @Nullable Long changelogRecordCount, + @JsonProperty(FIELD_WATERMARK) @Nullable Long watermark, + @JsonProperty(FIELD_STATISTICS) @Nullable String statistics, + @JsonProperty(FIELD_TAG_CREATE_TIME) @Nullable LocalDateTime tagCreateTime, + @JsonProperty(FIELD_TAG_TIME_RETAINED) @Nullable Duration tagTimeRetained) { + super( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + this.tagCreateTime = tagCreateTime; + this.tagTimeRetained = tagTimeRetained; + } + + @JsonGetter(FIELD_TAG_CREATE_TIME) + public @Nullable LocalDateTime getTagCreateTime() { + return tagCreateTime; + } + + @JsonGetter(FIELD_TAG_TIME_RETAINED) + public @Nullable Duration getTagTimeRetained() { + return tagTimeRetained; + } + + public String toJson() { + return JsonSerdeUtil.toJson(this); + } + + public static Tag fromJson(String json) { + return JsonSerdeUtil.fromJson(json, Tag.class); + } + + public static Tag fromPath(FileIO fileIO, Path path) { + try { + String json = fileIO.readFileUtf8(path); + return Tag.fromJson(json); + } catch (IOException e) { + throw new RuntimeException("Fails to read tag from path " + path, e); + } + } + + public static Optional safelyFromTagPath(FileIO fileIO, Path path) throws IOException { + try { + String json = fileIO.readFileUtf8(path); + return Optional.of(Tag.fromJson(json)); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + } + + public static Tag fromSnapshotAndTagTtl( + Snapshot snapshot, Duration tagTimeRetained, LocalDateTime tagCreateTime) { + return new Tag( + snapshot.version(), + snapshot.id(), + snapshot.schemaId(), + snapshot.baseManifestList(), + snapshot.deltaManifestList(), + snapshot.changelogManifestList(), + snapshot.indexManifest(), + snapshot.commitUser(), + snapshot.commitIdentifier(), + snapshot.commitKind(), + snapshot.timeMillis(), + snapshot.logOffsets(), + snapshot.totalRecordCount(), + snapshot.deltaRecordCount(), + snapshot.changelogRecordCount(), + snapshot.watermark(), + snapshot.statistics(), + tagCreateTime, + tagTimeRetained); + } + + public Snapshot toSnapshot() { + return new Snapshot( + version, + id, + schemaId, + baseManifestList, + deltaManifestList, + changelogManifestList, + indexManifest, + commitUser, + commitIdentifier, + commitKind, + timeMillis, + logOffsets, + totalRecordCount, + deltaRecordCount, + changelogRecordCount, + watermark, + statistics); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), tagCreateTime, tagTimeRetained); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + Tag that = (Tag) o; + return Objects.equals(tagCreateTime, that.tagCreateTime) + && Objects.equals(tagTimeRetained, that.tagTimeRetained); + } + + private static class TagComparator implements Comparator { + @Override + public int compare(Tag tag1, Tag tag2) { + int comparisonResult = 0; + + // Compare id + comparisonResult = Long.compare(tag1.id, tag2.id); + if (comparisonResult != 0) { + return comparisonResult; + } + + // Compare tagCreateTime + if (tag1.tagCreateTime != null && tag2.tagCreateTime != null) { + comparisonResult = tag1.tagCreateTime.compareTo(tag2.tagCreateTime); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare tagTimeRetained + if (tag1.tagTimeRetained != null && tag2.tagTimeRetained != null) { + comparisonResult = tag1.tagTimeRetained.compareTo(tag2.tagTimeRetained); + } + + // Compare version + if (tag1.version != null && tag2.version != null) { + comparisonResult = Integer.compare(tag1.version, tag2.version); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare schemaId + comparisonResult = Long.compare(tag1.schemaId, tag2.schemaId); + if (comparisonResult != 0) { + return comparisonResult; + } + + // Compare baseManifestList + if (tag1.baseManifestList != null && tag2.baseManifestList != null) { + comparisonResult = tag1.baseManifestList.compareTo(tag2.baseManifestList); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare deltaManifestList + if (tag1.deltaManifestList != null && tag2.deltaManifestList != null) { + comparisonResult = tag1.deltaManifestList.compareTo(tag2.deltaManifestList); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare changelogManifestList + if (tag1.changelogManifestList != null && tag2.changelogManifestList != null) { + comparisonResult = tag1.changelogManifestList.compareTo(tag2.changelogManifestList); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare indexManifest + if (tag1.indexManifest != null && tag2.indexManifest != null) { + comparisonResult = tag1.indexManifest.compareTo(tag2.indexManifest); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare commitUser + if (tag1.commitUser != null && tag2.commitUser != null) { + comparisonResult = tag1.commitUser.compareTo(tag2.commitUser); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare commitIdentifier + comparisonResult = Long.compare(tag1.commitIdentifier, tag2.commitIdentifier); + if (comparisonResult != 0) { + return comparisonResult; + } + + // Compare commitKind + if (tag1.commitKind != null && tag2.commitKind != null) { + comparisonResult = tag1.commitKind.compareTo(tag2.commitKind); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare timeMillis + comparisonResult = Long.compare(tag1.timeMillis, tag2.timeMillis); + if (comparisonResult != 0) { + return comparisonResult; + } + + // Compare logOffsets + if (tag1.logOffsets != null && tag2.logOffsets != null) { + comparisonResult = Integer.compare(tag1.logOffsets.size(), tag2.logOffsets.size()); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare totalRecordCount + if (tag1.totalRecordCount != null && tag2.totalRecordCount != null) { + comparisonResult = Long.compare(tag1.totalRecordCount, tag2.totalRecordCount); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare deltaRecordCount + if (tag1.deltaRecordCount != null && tag2.deltaRecordCount != null) { + comparisonResult = Long.compare(tag1.deltaRecordCount, tag2.deltaRecordCount); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare changelogRecordCount + if (tag1.changelogRecordCount != null && tag2.changelogRecordCount != null) { + comparisonResult = + Long.compare(tag1.changelogRecordCount, tag2.changelogRecordCount); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare watermark + if (tag1.watermark != null && tag2.watermark != null) { + comparisonResult = Long.compare(tag1.watermark, tag2.watermark); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + // Compare statistics + if (tag1.statistics != null && tag2.statistics != null) { + comparisonResult = tag1.statistics.compareTo(tag2.statistics); + if (comparisonResult != 0) { + return comparisonResult; + } + } + + return comparisonResult; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 505454313bd2..41e1e8d16c0d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; -import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor; import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor; @@ -46,11 +45,10 @@ public class TagAutoCreation { private final SnapshotManager snapshotManager; private final TagManager tagManager; - private final TagDeletion tagDeletion; private final TagTimeExtractor timeExtractor; private final TagPeriodHandler periodHandler; private final Duration delay; - private final Integer numRetainedMax; + private final Duration timeRetained; private final List callbacks; private final Duration idlenessTimeout; @@ -60,20 +58,18 @@ public class TagAutoCreation { private TagAutoCreation( SnapshotManager snapshotManager, TagManager tagManager, - TagDeletion tagDeletion, TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, Duration delay, - Integer numRetainedMax, + @Nullable Duration timeRetained, Duration idlenessTimeout, List callbacks) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; - this.tagDeletion = tagDeletion; this.timeExtractor = timeExtractor; this.periodHandler = periodHandler; this.delay = delay; - this.numRetainedMax = numRetainedMax; + this.timeRetained = timeRetained; this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; @@ -118,7 +114,7 @@ public boolean forceCreatingSnapshot() { public void run() { while (true) { if (snapshotManager.snapshotExists(nextSnapshot)) { - tryToTag(snapshotManager.snapshot(nextSnapshot)); + tryToCreateTags(snapshotManager.snapshot(nextSnapshot)); nextSnapshot++; } else { // avoid snapshot has been expired @@ -132,7 +128,7 @@ public void run() { } } - private void tryToTag(Snapshot snapshot) { + private void tryToCreateTags(Snapshot snapshot) { Optional timeOptional = timeExtractor.extract(snapshot.timeMillis(), snapshot.watermark()); if (!timeOptional.isPresent()) { @@ -144,28 +140,8 @@ private void tryToTag(Snapshot snapshot) { || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, timeRetained, callbacks); nextTag = periodHandler.nextTagTime(thisTag); - - if (numRetainedMax != null) { - // only handle auto-created tags here - SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); - if (tags.size() > numRetainedMax) { - int toDelete = tags.size() - numRetainedMax; - int i = 0; - for (List tag : tags.values()) { - tagManager.deleteTag( - checkAndGetOneAutoTag(tag), - tagDeletion, - snapshotManager, - callbacks); - i++; - if (i == toDelete) { - break; - } - } - } - } } } @@ -187,7 +163,6 @@ public static TagAutoCreation create( CoreOptions options, SnapshotManager snapshotManager, TagManager tagManager, - TagDeletion tagDeletion, List callbacks) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); if (extractor == null) { @@ -196,11 +171,10 @@ public static TagAutoCreation create( return new TagAutoCreation( snapshotManager, tagManager, - tagDeletion, extractor, TagPeriodHandler.create(options), options.tagCreationDelay(), - options.tagNumRetainedMax(), + options.tagDefaultTimeRetained(), options.snapshotWatermarkIdleTimeout(), callbacks); } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java new file mode 100644 index 000000000000..e31a926b5710 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; + +/** A manager to expire tags. */ +public class TagAutoExpire { + + private static final Logger LOG = LoggerFactory.getLogger(TagAutoExpire.class); + + private final SnapshotManager snapshotManager; + private final TagManager tagManager; + private final TagDeletion tagDeletion; + private final TagPeriodHandler periodHandler; + private final Integer numRetainedMax; + private final List callbacks; + + private TagAutoExpire( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + TagPeriodHandler periodHandler, + Duration delay, + Integer numRetainedMax, + List callbacks) { + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.tagDeletion = tagDeletion; + this.periodHandler = periodHandler; + this.numRetainedMax = numRetainedMax; + this.callbacks = callbacks; + this.periodHandler.validateDelay(delay); + } + + public void run() { + Set deleteTags = new HashSet<>(); + deleteTags.addAll(getExpireTagsByNumRetainedMax()); + deleteTags.addAll(getExpireTagsByTimeRetained()); + deleteTags.forEach( + tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, callbacks)); + } + + private Set getExpireTagsByNumRetainedMax() { + Set deleteTags = new HashSet<>(); + if (numRetainedMax != null) { + // only handle auto-created tags here + SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); + if (tags.size() > numRetainedMax) { + int toDelete = tags.size() - numRetainedMax; + int i = 0; + for (List tag : tags.values()) { + String tagName = TagAutoCreation.checkAndGetOneAutoTag(tag); + LOG.info( + "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", + tagName, + numRetainedMax); + deleteTags.add(tagName); + i++; + if (i == toDelete) { + break; + } + } + } + } + return deleteTags; + } + + private Set getExpireTagsByTimeRetained() { + // handle auto-created and non-auto-created-tags here + Set deleteTags = new HashSet<>(); + SortedMap> tags = tagManager.tagsWithTimeRetained(); + for (Map.Entry> entry : tags.entrySet()) { + Tag tag = entry.getKey(); + LocalDateTime createTime = tag.getTagCreateTime(); + Duration timeRetained = tag.getTagTimeRetained(); + if (createTime == null || timeRetained == null) { + continue; + } + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + for (String tagName : entry.getValue()) { + LOG.info( + "Delete tag {}, because its existence time has reached its timeRetained of {}.", + tagName, + timeRetained); + deleteTags.add(tagName); + } + } + } + return deleteTags; + } + + public static TagAutoExpire create( + CoreOptions options, + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + return new TagAutoExpire( + snapshotManager, + tagManager, + tagDeletion, + TagPeriodHandler.create(options), + options.tagCreationDelay(), + options.tagNumRetainedMax(), + callbacks); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java new file mode 100644 index 000000000000..a991559479a9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import java.util.List; + +/** A manager to create and expire tags. */ +public class TagAutoManager { + + private final TagAutoCreation tagAutoCreation; + private final TagAutoExpire tagAutoExpire; + + private TagAutoManager(TagAutoCreation tagAutoCreation, TagAutoExpire tagAutoExpire) { + this.tagAutoCreation = tagAutoCreation; + this.tagAutoExpire = tagAutoExpire; + } + + public void run() { + if (tagAutoCreation != null) { + tagAutoCreation.run(); + } + if (tagAutoExpire != null) { + tagAutoExpire.run(); + } + } + + public static TagAutoManager create( + CoreOptions options, + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); + + return new TagAutoManager( + extractor == null + ? null + : TagAutoCreation.create(options, snapshotManager, tagManager, callbacks), + TagAutoExpire.create(options, snapshotManager, tagManager, tagDeletion, callbacks)); + } + + public TagAutoCreation getTagAutoCreation() { + return tagAutoCreation; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 676276a30e58..aa493dd4a90b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -36,6 +36,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import javax.annotation.Nullable; @@ -57,6 +58,7 @@ public class JsonSerdeUtil { static { OBJECT_MAPPER_INSTANCE = new ObjectMapper(); OBJECT_MAPPER_INSTANCE.registerModule(createPaimonJacksonModule()); + OBJECT_MAPPER_INSTANCE.registerModule(new JavaTimeModule()); } public static LinkedHashMap parseJsonMap(String jsonString, Class valueType) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 134dea459f02..ce6f0202093d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,11 +26,16 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.tag.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -75,7 +80,11 @@ public Path branchTagPath(String branchName, String tagName) { } /** Create a tag from given snapshot and save it in the storage. */ - public void createTag(Snapshot snapshot, String tagName, List callbacks) { + public void createTag( + Snapshot snapshot, + String tagName, + @Nullable Duration timeRetained, + List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); // skip create tag for the same snapshot of the same name. @@ -86,7 +95,13 @@ public void createTag(Snapshot snapshot, String tagName, List callb } else { Path newTagPath = tagPath(tagName); try { - fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); + fileIO.writeFileUtf8( + newTagPath, + timeRetained != null + ? Tag.fromSnapshotAndTagTtl( + snapshot, timeRetained, LocalDateTime.now()) + .toJson() + : snapshot.toJson()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -227,7 +242,7 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Snapshot.fromPath(fileIO, tagPath(tagName)); + return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot(); } public long tagCount() { @@ -243,25 +258,41 @@ public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } - /** Get all tagged snapshots with names sorted by snapshot id. */ + /** Get all tag sorted by Tag. */ + public SortedMap> tagsWithTimeRetained() { + return tagsWithFilter(tagName -> true); + } + + /** Get all tagged snapshots with tag names sorted by snapshot id. */ public SortedMap> tags() { return tags(tagName -> true); } + public SortedMap> tags(Predicate filter) { + SortedMap> sortedTagMap = + new TreeMap<>(Comparator.comparingLong(Snapshot::id)); + SortedMap> tags = tagsWithFilter(filter); + tags.forEach( + (key, value) -> + sortedTagMap + .computeIfAbsent(key.toSnapshot(), tagNames -> new ArrayList<>()) + .addAll(value)); + return sortedTagMap; + } + /** - * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate - * determines which tag names should be included in the result. Only snapshots with tag names - * that pass the predicate test are included. + * Retrieves a sorted map of tags filtered based on a provided predicate. The predicate + * determines which tag names should be included in the result. Only tags with tag names that + * pass the predicate test are included. * - * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the - * test are excluded from the result. - * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag - * name. - * @throws RuntimeException if an IOException occurs during retrieval of snapshots. + * @param filter A Predicate that tests each tag name. Tags with tag names that fail the test + * are excluded from the result. + * @return A sorted map of filtered tags keyed by Tag.TAG_COMPARATOR, each associated with its + * tag name. + * @throws RuntimeException if an IOException occurs during retrieval of tags. */ - public SortedMap> tags(Predicate filter) { - TreeMap> tags = - new TreeMap<>(Comparator.comparingLong(Snapshot::id)); + public SortedMap> tagsWithFilter(Predicate filter) { + TreeMap> tags = new TreeMap<>(Tag.TAG_COMPARATOR); try { List paths = listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) @@ -276,10 +307,10 @@ public SortedMap> tags(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Snapshot.safelyFromPath(fileIO, path) + Tag.safelyFromTagPath(fileIO, path) .ifPresent( - snapshot -> - tags.computeIfAbsent(snapshot, s -> new ArrayList<>()) + tag -> + tags.computeIfAbsent(tag, s -> new ArrayList<>()) .add(tagName)); } } catch (IOException e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index de95819ad889..626c13c018fc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -140,7 +140,11 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { // create tags for each snapshot for (int id = 1; id <= latestSnapshotId; id++) { Snapshot snapshot = snapshotManager.snapshot(id); - tagManager.createTag(snapshot, "tag" + id, Collections.emptyList()); + tagManager.createTag( + snapshot, + "tag" + id, + store.options().tagDefaultTimeRetained(), + Collections.emptyList()); } // randomly expire snapshots diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 819b70d8a570..a9be8d3ac988 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -51,6 +51,7 @@ import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -262,7 +263,7 @@ public void testExpireWithExistingTags() throws Exception { // step 2: commit -A (by clean bucket 0) and create tag1 cleanBucket(store, gen.getPartition(gen.next()), 0); - createTag(snapshotManager.snapshot(2), "tag1"); + createTag(snapshotManager.snapshot(2), "tag1", store.options().tagDefaultTimeRetained()); assertThat(tagManager.tagExists("tag1")).isTrue(); // step 3: commit C to bucket 2 @@ -273,7 +274,7 @@ public void testExpireWithExistingTags() throws Exception { // step 4: commit -B (by clean bucket 1) and create tag2 cleanBucket(store, partition, 1); - createTag(snapshotManager.snapshot(4), "tag2"); + createTag(snapshotManager.snapshot(4), "tag2", store.options().tagDefaultTimeRetained()); assertThat(tagManager.tagExists("tag2")).isTrue(); // step 5: commit D to bucket 3 @@ -353,7 +354,7 @@ public void testExpireWithUpgradeAndTags() throws Exception { // snapshot 3: commit -A (by clean bucket 0) cleanBucket(store, gen.getPartition(gen.next()), 0); - createTag(snapshotManager.snapshot(1), "tag1"); + createTag(snapshotManager.snapshot(1), "tag1", store.options().tagDefaultTimeRetained()); store.newExpire(1, 1, Long.MAX_VALUE).expire(); // check data file and manifests @@ -410,7 +411,7 @@ public void testDeleteTagWithSnapshot() throws Exception { Arrays.asList(snapshot1.baseManifestList(), snapshot1.deltaManifestList()); // create tag1 - createTag(snapshot1, "tag1"); + createTag(snapshot1, "tag1", store.options().tagDefaultTimeRetained()); // expire snapshot 1, 2 store.newExpire(1, 1, Long.MAX_VALUE).expire(); @@ -485,9 +486,9 @@ public void testDeleteTagWithOtherTag() throws Exception { Arrays.asList(snapshot2.baseManifestList(), snapshot2.deltaManifestList()); // create tags - createTag(snapshotManager.snapshot(1), "tag1"); - createTag(snapshotManager.snapshot(2), "tag2"); - createTag(snapshotManager.snapshot(4), "tag3"); + createTag(snapshotManager.snapshot(1), "tag1", store.options().tagDefaultTimeRetained()); + createTag(snapshotManager.snapshot(2), "tag2", store.options().tagDefaultTimeRetained()); + createTag(snapshotManager.snapshot(4), "tag3", store.options().tagDefaultTimeRetained()); // expire snapshot 1, 2, 3, 4 store.newExpire(1, 1, Long.MAX_VALUE).expire(); @@ -735,7 +736,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { null); } - private void createTag(Snapshot snapshot, String tagName) { - tagManager.createTag(snapshot, tagName, Collections.emptyList()); + private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) { + tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index 51cfcd702f8c..b0d357853b6e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -18,7 +18,6 @@ package org.apache.paimon.table.system; -import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -30,6 +29,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.TableTestBase; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.TagManager; @@ -120,19 +120,27 @@ void testTagBranchesTable() throws Exception { private List getExceptedResult( Function> tagBranchesFunction) { List internalRows = new ArrayList<>(); - for (Map.Entry> tag : tagManager.tags().entrySet()) { - Snapshot snapshot = tag.getKey(); - for (String tagName : tag.getValue()) { + for (Map.Entry> snapshot : tagManager.tagsWithTimeRetained().entrySet()) { + Tag tag = snapshot.getKey(); + for (String tagName : snapshot.getValue()) { internalRows.add( GenericRow.of( BinaryString.fromString(tagName), - snapshot.id(), - snapshot.schemaId(), + tag.id(), + tag.schemaId(), Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(snapshot.timeMillis())), - snapshot.totalRecordCount(), + DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), BinaryString.fromString( - tagBranchesFunction.apply(tagName).toString()))); + tagBranchesFunction.apply(tagName).toString()), + Timestamp.fromLocalDateTime( + tag.getTagCreateTime() == null + ? LocalDateTime.MIN + : tag.getTagCreateTime()), + BinaryString.fromString( + tag.getTagTimeRetained() == null + ? "" + : tag.getTagTimeRetained().toString()))); } } return internalRows; diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java similarity index 78% rename from paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java rename to paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index f76a58e6cfd1..a065945bad01 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.CoreOptions.TagCreationPeriod; import org.apache.paimon.CoreOptions.TagPeriodFormatter; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; @@ -33,6 +34,7 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Collections; import static org.apache.paimon.CoreOptions.SINK_WATERMARK_TIME_ZONE; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; @@ -41,12 +43,13 @@ import static org.apache.paimon.CoreOptions.TAG_AUTOMATIC_CREATION; import static org.apache.paimon.CoreOptions.TAG_CREATION_DELAY; import static org.apache.paimon.CoreOptions.TAG_CREATION_PERIOD; +import static org.apache.paimon.CoreOptions.TAG_DEFAULT_TIME_RETAINED; import static org.apache.paimon.CoreOptions.TAG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.TAG_PERIOD_FORMATTER; import static org.assertj.core.api.Assertions.assertThat; /** Test for tag automatic creation. */ -public class TagAutoCreationTest extends PrimaryKeyTableTestBase { +public class TagAutoManagerTest extends PrimaryKeyTableTestBase { @Test public void testTag() throws Exception { @@ -320,6 +323,104 @@ public void testWatermarkIdleTimeoutForceCreatingSnapshot() throws Exception { commit.close(); } + @Test + public void testAutoCreateTagNotExpiredByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(4, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()).containsOnly("2023-07-18 18"); + + commit.close(); + } + + @Test + public void testExpireTagsByTimeRetained() throws Exception { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + options.set(TAG_DEFAULT_TIME_RETAINED, Duration.ofMillis(500)); + FileStoreTable table = this.table.copy(options.toMap()); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:12:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + + Snapshot snapshot1 = + new Snapshot( + 4, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + tagManager.createTag( + snapshot1, + "non-auto-create-tag-shoule-expire", + Duration.ofMillis(500), + Collections.emptyList()); + + Snapshot snapshot2 = + new Snapshot( + 5, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + tagManager.createTag( + snapshot2, + "non-auto-create-tag-shoule-not-expire", + Duration.ofDays(1), + Collections.emptyList()); + + // test expire old tag by time-retained + Thread.sleep(1000); + commit.commit(new ManifestCommittable(6, utcMills("2023-07-18T19:00:00"))); + assertThat(tagManager.allTagNames()) + .containsOnly("2023-07-18 18", "non-auto-create-tag-shoule-not-expire"); + commit.close(); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault()) diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java new file mode 100644 index 000000000000..922055983b04 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.Snapshot; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +/** Test for {@link Tag}. */ +public class TagTest { + + private final Snapshot snapshot = + new Snapshot( + 0, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null); + + @Test + public void testFromJson() { + Tag tag = Tag.fromJson(snapshot.toJson()); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null\n" + + "}", + tag.toJson()); + } + + @Test + public void testFromSnapshotAndTagTtl() { + Tag tag = + Tag.fromSnapshotAndTagTtl( + snapshot, + Duration.ofSeconds(5), + LocalDateTime.of(1969, 1, 1, 0, 0, 0, 123456789)); + String tagJson = tag.toJson(); + Assert.assertEquals( + "{\n" + + " \"version\" : 3,\n" + + " \"id\" : 0,\n" + + " \"schemaId\" : 0,\n" + + " \"baseManifestList\" : null,\n" + + " \"deltaManifestList\" : null,\n" + + " \"changelogManifestList\" : null,\n" + + " \"commitUser\" : null,\n" + + " \"commitIdentifier\" : 0,\n" + + " \"commitKind\" : \"APPEND\",\n" + + " \"timeMillis\" : 1000,\n" + + " \"totalRecordCount\" : null,\n" + + " \"deltaRecordCount\" : null,\n" + + " \"tagCreateTime\" : [ 1969, 1, 1, 0, 0, 0, 123456789 ],\n" + + " \"tagTimeRetained\" : 5.000000000\n" + + "}", + tagJson); + + Tag newTag = Tag.fromJson(tagJson); + Assert.assertEquals(tag, newTag); + } + + @Test + public void testTagComparator() { + Tag tag1 = + new Tag( + 3, + 2L, + 0, + null, + null, + null, + null, + null, + 8, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null, + LocalDateTime.now(), + Duration.ofSeconds(10)); + + Tag tag2 = + new Tag( + 3, + 1L, + 0, + null, + null, + null, + null, + null, + 8, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null, + LocalDateTime.now(), + Duration.ofSeconds(10)); + + Tag tag3 = + new Tag( + 3, + 0L, + 0, + null, + null, + null, + null, + null, + 8, + Snapshot.CommitKind.APPEND, + 1000, + null, + null, + null, + null, + null, + null, + LocalDateTime.now(), + Duration.ofSeconds(10)); + List tags = new ArrayList<>(); + tags.add(tag1); + tags.add(tag2); + tags.add(tag3); + tags.sort(Tag.TAG_COMPARATOR); + Assert.assertEquals(0, tags.get(0).id()); + Assert.assertEquals(1, tags.get(1).id()); + Assert.assertEquals(2, tags.get(2).id()); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java new file mode 100644 index 000000000000..12f38931ded1 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.operation.FileStoreTestUtils; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.tag.Tag; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.paimon.operation.FileStoreTestUtils.commitData; +import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for TagManager. */ +public class TagManagerTest { + + @TempDir java.nio.file.Path tempDir; + + private final FileIO fileIO = new LocalFileIO(); + + private long commitIdentifier; + private String root; + private TagManager tagManager; + + @BeforeEach + public void setup() throws Exception { + commitIdentifier = 0L; + root = tempDir.toString(); + tagManager = null; + } + + @Test + public void testCreateTagWithoutTimeRetained() throws Exception { + TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4); + tagManager = new TagManager(fileIO, store.options().path()); + SnapshotManager snapshotManager = store.snapshotManager(); + TestKeyValueGenerator gen = + new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); + BinaryRow partition = gen.getPartition(gen.next()); + + // commit A to bucket 0 and B to bucket 1 + Map>> writers = new HashMap<>(); + for (int bucket : Arrays.asList(0, 1)) { + List kvs = partitionedData(5, gen); + writeData(store, kvs, partition, bucket, writers); + } + commitData(store, commitIdentifier++, writers); + + tagManager.createTag( + snapshotManager.snapshot(1), + "tag", + store.options().tagDefaultTimeRetained(), + Collections.emptyList()); + assertThat(tagManager.tagExists("tag")).isTrue(); + Snapshot snapshot = tagManager.taggedSnapshot("tag"); + String snapshotJson = snapshot.toJson(); + Assertions.assertTrue( + !snapshotJson.contains("tagCreateTime") + && !snapshotJson.contains("tagTimeRetained")); + Assertions.assertEquals(snapshot, Snapshot.fromJson(snapshotJson)); + } + + @Test + public void testCreateTagWithTimeRetained() throws Exception { + TestFileStore store = createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 4); + tagManager = new TagManager(fileIO, store.options().path()); + SnapshotManager snapshotManager = store.snapshotManager(); + TestKeyValueGenerator gen = + new TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED); + BinaryRow partition = gen.getPartition(gen.next()); + + // commit A to bucket 0 and B to bucket 1 + Map>> writers = new HashMap<>(); + for (int bucket : Arrays.asList(0, 1)) { + List kvs = partitionedData(5, gen); + writeData(store, kvs, partition, bucket, writers); + } + commitData(store, commitIdentifier++, writers); + + tagManager.createTag( + snapshotManager.snapshot(1), "tag", Duration.ofDays(1), Collections.emptyList()); + assertThat(tagManager.tagExists("tag")).isTrue(); + SortedMap> tagsWithTimeRetained = tagManager.tagsWithTimeRetained(); + Assertions.assertEquals(1, tagsWithTimeRetained.size()); + Tag tag = tagsWithTimeRetained.firstKey(); + String tagJson = tag.toJson(); + Assertions.assertTrue( + tagJson.contains("tagCreateTime") && tagJson.contains("tagTimeRetained")); + Assertions.assertEquals(tag, Tag.fromJson(tagJson)); + assertThat(tagsWithTimeRetained.get(tag)).contains("tag"); + } + + private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode mode, int buckets) + throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + CoreOptions.ChangelogProducer changelogProducer; + if (random.nextBoolean()) { + changelogProducer = CoreOptions.ChangelogProducer.INPUT; + } else { + changelogProducer = CoreOptions.ChangelogProducer.NONE; + } + + RowType rowType, partitionType; + switch (mode) { + case NON_PARTITIONED: + rowType = TestKeyValueGenerator.NON_PARTITIONED_ROW_TYPE; + partitionType = TestKeyValueGenerator.NON_PARTITIONED_PART_TYPE; + break; + case SINGLE_PARTITIONED: + rowType = TestKeyValueGenerator.SINGLE_PARTITIONED_ROW_TYPE; + partitionType = TestKeyValueGenerator.SINGLE_PARTITIONED_PART_TYPE; + break; + case MULTI_PARTITIONED: + rowType = TestKeyValueGenerator.DEFAULT_ROW_TYPE; + partitionType = TestKeyValueGenerator.DEFAULT_PART_TYPE; + break; + default: + throw new UnsupportedOperationException("Unsupported generator mode: " + mode); + } + + SchemaManager schemaManager = new SchemaManager(fileIO, new Path(root)); + TableSchema tableSchema = + schemaManager.createTable( + new Schema( + rowType.getFields(), + partitionType.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys(mode), + Collections.emptyMap(), + null)); + + return new TestFileStore.Builder( + "avro", + root, + buckets, + partitionType, + TestKeyValueGenerator.KEY_TYPE, + rowType, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .changelogProducer(changelogProducer) + .build(); + } + + private void writeData( + TestFileStore store, + List kvs, + BinaryRow partition, + int bucket, + Map>> writers) + throws Exception { + writers.computeIfAbsent(partition, p -> new HashMap<>()) + .put(bucket, FileStoreTestUtils.writeData(store, kvs, partition, bucket)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index 0e500c3e7488..f1455d3774e4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -18,12 +18,16 @@ package org.apache.paimon.flink.action; +import javax.annotation.Nullable; + +import java.time.Duration; import java.util.Map; /** Create tag action for Flink. */ public class CreateTagAction extends TableActionBase { private final String tagName; + private final Duration timeRetained; private final Long snapshotId; public CreateTagAction( @@ -32,18 +36,20 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, + @Nullable Duration timeRetained, Long snapshotId) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; + this.timeRetained = timeRetained; this.snapshotId = snapshotId; } @Override public void run() throws Exception { if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, timeRetained, snapshotId); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index 01bbdba42abf..6cca76cb141e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.action; +import org.apache.paimon.utils.TimeUtils; + import org.apache.flink.api.java.tuple.Tuple3; +import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -30,6 +33,7 @@ public class CreateTagActionFactory implements ActionFactory { private static final String TAG_NAME = "tag_name"; private static final String SNAPSHOT = "snapshot"; + private static final String TIME_RETAINED = "time_retained"; @Override public String identifier() { @@ -49,9 +53,20 @@ public Optional create(MultipleParameterToolAdapter params) { snapshot = Long.parseLong(params.get(SNAPSHOT)); } + Duration timeRetained = null; + if (params.has(TIME_RETAINED)) { + timeRetained = TimeUtils.parseDuration(params.get(TIME_RETAINED)); + } + CreateTagAction action = new CreateTagAction( - tablePath.f0, tablePath.f1, tablePath.f2, catalogConfig, tagName, snapshot); + tablePath.f0, + tablePath.f1, + tablePath.f2, + catalogConfig, + tagName, + timeRetained, + snapshot); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 9999fdf11776..3bdc2e07edc3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -21,6 +21,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.TimeUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -30,7 +31,7 @@ * Create tag procedure. Usage: * *

- *  CALL sys.create_tag('tableId', 'tagName', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', 'timeRetained', snapshotId)
  * 
*/ public class CreateTagProcedure extends ProcedureBase { @@ -38,23 +39,29 @@ public class CreateTagProcedure extends ProcedureBase { public static final String IDENTIFIER = "create_tag"; public String[] call( - ProcedureContext procedureContext, String tableId, String tagName, long snapshotId) + ProcedureContext procedureContext, + String tableId, + String tagName, + String timeRetained, + long snapshotId) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, snapshotId); + return innerCall(tableId, tagName, timeRetained, snapshotId); } - public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + public String[] call( + ProcedureContext procedureContext, String tableId, String tagName, String timeRetained) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, null); + return innerCall(tableId, tagName, timeRetained, null); } - private String[] innerCall(String tableId, String tagName, @Nullable Long snapshotId) + private String[] innerCall( + String tableId, String tagName, String timeRetained, @Nullable Long snapshotId) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (snapshotId == null) { - table.createTag(tagName); + table.createTag(tagName, TimeUtils.parseDuration(timeRetained)); } else { - table.createTag(tagName, snapshotId); + table.createTag(tagName, TimeUtils.parseDuration(timeRetained), snapshotId); } return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index dcf2c8b0045c..6d27c6019483 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -46,6 +46,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; @@ -76,6 +77,8 @@ public class AutoTagForSavepointCommitterOperator private final NavigableSet identifiersForTags; + private final Duration tagTimeRetained; + private transient SnapshotManager snapshotManager; private transient TagManager tagManager; @@ -91,13 +94,15 @@ public AutoTagForSavepointCommitterOperator( SerializableSupplier snapshotManagerFactory, SerializableSupplier tagManagerFactory, SerializableSupplier tagDeletionFactory, - SerializableSupplier> callbacksSupplier) { + SerializableSupplier> callbacksSupplier, + Duration tagTimeRetained) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; this.identifiersForTags = new TreeSet<>(); + this.tagTimeRetained = tagTimeRetained; } @Override @@ -167,7 +172,7 @@ private void createTagForIdentifiers(List identifiers) { for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, callbacks); + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 2c898831ec2c..23202b45077f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -118,7 +118,11 @@ private void createTag() { tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); } // Create a new tag - tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); + tagManager.createTag( + snapshot, + tagName, + table.coreOptions().tagDefaultTimeRetained(), + table.store().createTagCallbacks()); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f7ebf5afdf24..fa4526897bcd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -238,7 +238,8 @@ protected DataStreamSink doCommit(DataStream written, String com table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.coreOptions().tagDefaultTimeRetained()); } if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 8d445ab95b07..eb6f70f655d3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -36,6 +36,7 @@ /** IT cases for branch management actions. */ class BranchActionITCase extends ActionITCaseBase { + @Test void testCreateAndDeleteBranch() throws Exception { @@ -63,7 +64,8 @@ void testCreateAndDeleteBranch() throws Exception { TagManager tagManager = new TagManager(table.fileIO(), table.location()); callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); BranchManager branchManager = table.branchManager(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index a01849ed94ca..a650ad31b347 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -83,7 +83,8 @@ public void testCreateAndDeleteTag() throws Exception { .run(); } else { callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); @@ -153,7 +154,8 @@ public void testCreateLatestTag() throws Exception { .run(); } else { callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + String.format( + "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 0dbde05795fd..3b58c24d16b1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -208,7 +208,8 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.store().options().tagDefaultTimeRetained()); } @Override @@ -224,6 +225,7 @@ protected OneInputStreamOperator createCommitterOperat table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + table.store().options().tagDefaultTimeRetained()); } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 4d9753babc34..88878c2c71cf 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -913,7 +913,7 @@ public void testAddPartitionsForTag() throws Exception { " 'metastore.tag-to-partition' = 'dt'", ")")); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16"); @@ -927,7 +927,7 @@ public void testAddPartitionsForTag() throws Exception { // another tag tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); assertThat(hiveShell.executeQuery("SELECT * FROM t")) .containsExactlyInAnyOrder( @@ -951,9 +951,9 @@ public void testDeletePartitionForTag() throws Exception { + " 'metastore.tag-to-partition' = 'dt'\n" + ")"); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); tEnv.executeSql("INSERT INTO t VALUES (3, 30)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16", "dt=2023-10-17"); @@ -979,7 +979,7 @@ public void testHistoryPartitionsCascadeToUpdate() throws Exception { " 'metastore.tag-to-partition' = 'dt'", ")")); tEnv.executeSql("INSERT INTO t VALUES (1, 10), (2, 20)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', 1)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-16', '5 d', 1)"); assertThat(hiveShell.executeQuery("SHOW PARTITIONS t")) .containsExactlyInAnyOrder("dt=2023-10-16"); @@ -991,7 +991,7 @@ public void testHistoryPartitionsCascadeToUpdate() throws Exception { .containsExactlyInAnyOrder("1\t10\t2023-10-16", "2\t20\t2023-10-16"); tEnv.executeSql("INSERT INTO t VALUES (3, 30), (4, 40)").await(); - tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', 2)"); + tEnv.executeSql("CALL sys.create_tag('test_db.t', '2023-10-17', '5 d', 2)"); tEnv.executeSql("ALTER TABLE t ADD z INT"); tEnv.executeSql("INSERT INTO t VALUES (3, 30, 5), (4, 40, 6)").await(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index 7f9bdb62108b..38d181bac48f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.utils.TimeUtils; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; @@ -26,6 +28,8 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.time.Duration; + import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -36,6 +40,7 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), + ProcedureParameter.optional("time_retained", StringType), ProcedureParameter.optional("snapshot", LongType) }; @@ -63,15 +68,17 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String tag = args.getString(1); - Long snapshot = args.isNullAt(2) ? null : args.getLong(2); + Duration timeRetained = + args.isNullAt(2) ? null : TimeUtils.parseDuration(args.getString(2)); + Long snapshot = args.isNullAt(3) ? null : args.getLong(3); return modifyPaimonTable( tableIdent, table -> { if (snapshot == null) { - table.createTag(tag); + table.createTag(tag, timeRetained); } else { - table.createTag(tag, snapshot); + table.createTag(tag, timeRetained, snapshot); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala index e505ae110033..3c9531a8a7d3 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala @@ -70,7 +70,8 @@ class CreateAndDeleteTagProcedureTest extends PaimonSparkTestBase with StreamTes checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) checkAnswer( spark.sql( - "CALL paimon.sys.create_tag(table => 'test.T', tag => 'test_tag', snapshot => 2)"), + "CALL paimon.sys.create_tag(" + + "table => 'test.T', tag => 'test_tag', time_retained => '5 d', snapshot => 2)"), Row(true) :: Nil) checkAnswer( spark.sql("SELECT tag_name FROM paimon.test.`T$tags`"),