From c0c4e08e0516ffcaa7b40b3683ae4ffbdc5ce649 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Tue, 16 Apr 2024 10:30:47 +0800 Subject: [PATCH] [core] Adjust codes TTL for tag This closes #3159 --- docs/content/maintenance/manage-tags.md | 6 +- .../generated/core_configuration.html | 2 +- .../java/org/apache/paimon/CoreOptions.java | 3 +- .../org/apache/paimon/AbstractFileStore.java | 1 - .../java/org/apache/paimon/FileStore.java | 1 - .../main/java/org/apache/paimon/Snapshot.java | 8 +- .../paimon/table/AbstractFileStoreTable.java | 14 +- .../apache/paimon/table/ReadonlyTable.java | 11 +- .../java/org/apache/paimon/table/Table.java | 6 +- .../paimon/table/sink/TableCommitImpl.java | 8 +- .../apache/paimon/table/system/TagsTable.java | 10 +- .../main/java/org/apache/paimon/tag/Tag.java | 165 +----------------- .../apache/paimon/tag/TagAutoCreation.java | 47 ++++- .../org/apache/paimon/tag/TagAutoExpire.java | 140 --------------- .../org/apache/paimon/tag/TagAutoManager.java | 15 +- .../org/apache/paimon/tag/TagTimeExpire.java | 82 +++++++++ .../apache/paimon/utils/SnapshotManager.java | 5 +- .../org/apache/paimon/utils/TagManager.java | 71 ++++---- .../paimon/table/system/TagsTableTest.java | 42 +++-- .../java/org/apache/paimon/tag/TagTest.java | 79 --------- .../apache/paimon/utils/TagManagerTest.java | 9 +- .../paimon/flink/action/CreateTagAction.java | 10 +- .../flink/action/CreateTagActionFactory.java | 4 +- .../flink/procedure/CreateTagProcedure.java | 41 ++++- .../flink/action/BranchActionITCase.java | 2 +- .../paimon/flink/action/TagActionITCase.java | 6 +- .../spark/procedure/CreateTagProcedure.java | 10 +- 27 files changed, 283 insertions(+), 515 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md index 26b657ebbd46b..75a4496659fff 100644 --- a/docs/content/maintenance/manage-tags.md +++ b/docs/content/maintenance/manage-tags.md @@ -108,8 +108,8 @@ You can create a tag with given name and snapshot ID. --database \ --table \ --tag_name \ - --time_retained \ [--snapshot ] \ + [--time_retained ] \ [--catalog_conf [--catalog_conf ...]] ``` @@ -127,7 +127,7 @@ public class CreateTag { public static void main(String[] args) { Table table = ...; table.createTag("my-tag", 1); - table.createTag("my-tag-retained-12-hours", Duration.ofHours(12), 1); + table.createTag("my-tag-retained-12-hours", 1, Duration.ofHours(12)); } } ``` @@ -142,7 +142,7 @@ CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2); To create a tag with retained 1 day, run the following sql: ```sql -CALL create_tag(table => 'test.t', tag => 'test_tag', time_retained => '1 d', snapshot => 2); +CALL create_tag(table => 'test.t', tag => 'test_tag', snapshot => 2, time_retained => '1 d'); ``` To create a tag based on the latest snapshot id, run the following sql: diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f096271489034..cab6878bb3a3d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -682,7 +682,7 @@
tag.default-time-retained
(none) Duration - The maximum default time retained for all tags. + The default maximum time retained for newly created tags.
tag.num-retained-max
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index f3ce61f6d2fdf..a302a3d76df14 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1018,7 +1018,7 @@ public class CoreOptions implements Serializable { key("tag.default-time-retained") .durationType() .noDefaultValue() - .withDescription("The maximum default time retained for all tags."); + .withDescription("The default maximum time retained for newly created tags."); public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT = key("snapshot.watermark-idle-timeout") @@ -1649,6 +1649,7 @@ public TagPeriodFormatter tagPeriodFormatter() { return options.get(TAG_PERIOD_FORMATTER); } + @Nullable public Integer tagNumRetainedMax() { return options.get(TAG_NUM_RETAINED_MAX); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index b7830f0ace515..07a73d3de2aa9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -248,7 +248,6 @@ public PartitionExpire newPartitionExpire(String commitUser) { } @Override - @Nullable public TagAutoManager newTagCreationManager() { return TagAutoManager.create( options, diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 1d9e247d43925..870feffdef685 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -92,7 +92,6 @@ public interface FileStore extends Serializable { @Nullable PartitionExpire newPartitionExpire(String commitUser); - @Nullable TagAutoManager newTagCreationManager(); ServiceManager newServiceManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 0c42c4e430931..fefc41058774b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** * This file is the entrance to all data committed at some specific time point. @@ -447,12 +446,13 @@ public static Snapshot fromPath(FileIO fileIO, Path path) { } } - public static Optional safelyFromPath(FileIO fileIO, Path path) throws IOException { + @Nullable + public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException { try { String json = fileIO.readFileUtf8(path); - return Optional.of(Snapshot.fromJson(json)); + return Snapshot.fromJson(json); } catch (FileNotFoundException e) { - return Optional.empty(); + return null; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 871bba4affaed..1ca321d563a59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -488,29 +488,29 @@ public Snapshot createTagInternal(long fromSnapshotId) { @Override public void createTag(String tagName, long fromSnapshotId) { createTag( - tagName, coreOptions().tagDefaultTimeRetained(), createTagInternal(fromSnapshotId)); + tagName, createTagInternal(fromSnapshotId), coreOptions().tagDefaultTimeRetained()); } @Override - public void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) { - createTag(tagName, timeRetained, createTagInternal(fromSnapshotId)); + public void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { + createTag(tagName, createTagInternal(fromSnapshotId), timeRetained); } @Override public void createTag(String tagName) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, coreOptions().tagDefaultTimeRetained(), latestSnapshot); + createTag(tagName, latestSnapshot, coreOptions().tagDefaultTimeRetained()); } @Override - public void createTag(String tagName, @Nullable Duration timeRetained) { + public void createTag(String tagName, Duration timeRetained) { Snapshot latestSnapshot = snapshotManager().latestSnapshot(); checkNotNull(latestSnapshot, "Cannot create tag because latest snapshot doesn't exist."); - createTag(tagName, timeRetained, latestSnapshot); + createTag(tagName, latestSnapshot, timeRetained); } - private void createTag(String tagName, @Nullable Duration timeRetained, Snapshot fromSnapshot) { + private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) { tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index be823156c0cd5..be4976f1003e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,7 +18,6 @@ package org.apache.paimon.table; -import org.apache.paimon.annotation.Experimental; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -26,8 +25,6 @@ import org.apache.paimon.table.sink.StreamWriteBuilder; import org.apache.paimon.table.source.InnerStreamTableScan; -import javax.annotation.Nullable; - import java.time.Duration; import java.util.Collections; import java.util.List; @@ -113,8 +110,8 @@ default void createTag(String tagName, long fromSnapshotId) { this.getClass().getSimpleName())); } - @Experimental - default void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId) { + @Override + default void createTag(String tagName, long fromSnapshotId, Duration timeRetained) { throw new UnsupportedOperationException( String.format( "Readonly Table %s does not support createTag.", @@ -129,8 +126,8 @@ default void createTag(String tagName) { this.getClass().getSimpleName())); } - @Experimental - default void createTag(String tagName, @Nullable Duration timeRetained) { + @Override + default void createTag(String tagName, Duration timeRetained) { throw new UnsupportedOperationException( String.format( "Readonly Table %s does not support createTag.", diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3d46d25a1a934..3650b773cb5af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -26,8 +26,6 @@ import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; -import javax.annotation.Nullable; - import java.io.Serializable; import java.time.Duration; import java.util.List; @@ -80,14 +78,14 @@ public interface Table extends Serializable { void createTag(String tagName, long fromSnapshotId); @Experimental - void createTag(String tagName, @Nullable Duration timeRetained, long fromSnapshotId); + void createTag(String tagName, long fromSnapshotId, Duration timeRetained); /** Create a tag from latest snapshot. */ @Experimental void createTag(String tagName); @Experimental - void createTag(String tagName, @Nullable Duration timeRetained); + void createTag(String tagName, Duration timeRetained); /** Delete a tag by name. */ @Experimental diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 9d471ee5761f6..eab7a0c6d0364 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -134,11 +134,9 @@ public boolean forceCreatingSnapshot() { if (this.forceCreatingSnapshot) { return true; } - if (tagAutoManager != null) { - return tagAutoManager.getTagAutoCreation() != null - && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot(); - } - return false; + return tagAutoManager != null + && tagAutoManager.getTagAutoCreation() != null + && tagAutoManager.getTagAutoCreation().forceCreatingSnapshot(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index e51ebcb0d64eb..21c9cf495fb38 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.TimestampType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; import org.apache.paimon.utils.TagManager; @@ -61,7 +62,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.SortedMap; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; @@ -208,12 +208,10 @@ public RecordReader createReader(Split split) { Options options = new Options(); options.set(CoreOptions.PATH, location.toUri().toString()); FileStoreTable table = FileStoreTableFactory.create(fileIO, options); - SortedMap> tags = table.tagManager().tagsWithTimeRetained(); + List> tags = table.tagManager().tagObjects(); Map nameToSnapshot = new LinkedHashMap<>(); - for (Map.Entry> tag : tags.entrySet()) { - for (String tagName : tag.getValue()) { - nameToSnapshot.put(tagName, tag.getKey()); - } + for (Pair tag : tags) { + nameToSnapshot.put(tag.getValue(), tag.getKey()); } Map> tagBranches = new HashMap<>(); table.branchManager() diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java index 58dbf430b45f9..938bb417defa3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/Tag.java @@ -34,16 +34,12 @@ import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; -import java.util.Comparator; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** Snapshot with tagCreateTime and tagTimeRetained. */ public class Tag extends Snapshot { - public static final Comparator TAG_COMPARATOR = new TagComparator(); - private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime"; private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained"; @@ -110,6 +106,7 @@ public Tag( return tagTimeRetained; } + @Override public String toJson() { return JsonSerdeUtil.toJson(this); } @@ -127,12 +124,13 @@ public static Tag fromPath(FileIO fileIO, Path path) { } } - public static Optional safelyFromTagPath(FileIO fileIO, Path path) throws IOException { + @Nullable + public static Tag safelyFromPath(FileIO fileIO, Path path) throws IOException { try { String json = fileIO.readFileUtf8(path); - return Optional.of(Tag.fromJson(json)); + return Tag.fromJson(json); } catch (FileNotFoundException e) { - return Optional.empty(); + return null; } } @@ -160,7 +158,7 @@ public static Tag fromSnapshotAndTagTtl( tagTimeRetained); } - public Snapshot toSnapshot() { + public Snapshot trimToSnapshot() { return new Snapshot( version, id, @@ -201,155 +199,4 @@ public boolean equals(Object o) { return Objects.equals(tagCreateTime, that.tagCreateTime) && Objects.equals(tagTimeRetained, that.tagTimeRetained); } - - private static class TagComparator implements Comparator { - @Override - public int compare(Tag tag1, Tag tag2) { - int comparisonResult = 0; - - // Compare id - comparisonResult = Long.compare(tag1.id, tag2.id); - if (comparisonResult != 0) { - return comparisonResult; - } - - // Compare tagCreateTime - if (tag1.tagCreateTime != null && tag2.tagCreateTime != null) { - comparisonResult = tag1.tagCreateTime.compareTo(tag2.tagCreateTime); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare tagTimeRetained - if (tag1.tagTimeRetained != null && tag2.tagTimeRetained != null) { - comparisonResult = tag1.tagTimeRetained.compareTo(tag2.tagTimeRetained); - } - - // Compare version - if (tag1.version != null && tag2.version != null) { - comparisonResult = Integer.compare(tag1.version, tag2.version); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare schemaId - comparisonResult = Long.compare(tag1.schemaId, tag2.schemaId); - if (comparisonResult != 0) { - return comparisonResult; - } - - // Compare baseManifestList - if (tag1.baseManifestList != null && tag2.baseManifestList != null) { - comparisonResult = tag1.baseManifestList.compareTo(tag2.baseManifestList); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare deltaManifestList - if (tag1.deltaManifestList != null && tag2.deltaManifestList != null) { - comparisonResult = tag1.deltaManifestList.compareTo(tag2.deltaManifestList); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare changelogManifestList - if (tag1.changelogManifestList != null && tag2.changelogManifestList != null) { - comparisonResult = tag1.changelogManifestList.compareTo(tag2.changelogManifestList); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare indexManifest - if (tag1.indexManifest != null && tag2.indexManifest != null) { - comparisonResult = tag1.indexManifest.compareTo(tag2.indexManifest); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare commitUser - if (tag1.commitUser != null && tag2.commitUser != null) { - comparisonResult = tag1.commitUser.compareTo(tag2.commitUser); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare commitIdentifier - comparisonResult = Long.compare(tag1.commitIdentifier, tag2.commitIdentifier); - if (comparisonResult != 0) { - return comparisonResult; - } - - // Compare commitKind - if (tag1.commitKind != null && tag2.commitKind != null) { - comparisonResult = tag1.commitKind.compareTo(tag2.commitKind); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare timeMillis - comparisonResult = Long.compare(tag1.timeMillis, tag2.timeMillis); - if (comparisonResult != 0) { - return comparisonResult; - } - - // Compare logOffsets - if (tag1.logOffsets != null && tag2.logOffsets != null) { - comparisonResult = Integer.compare(tag1.logOffsets.size(), tag2.logOffsets.size()); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare totalRecordCount - if (tag1.totalRecordCount != null && tag2.totalRecordCount != null) { - comparisonResult = Long.compare(tag1.totalRecordCount, tag2.totalRecordCount); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare deltaRecordCount - if (tag1.deltaRecordCount != null && tag2.deltaRecordCount != null) { - comparisonResult = Long.compare(tag1.deltaRecordCount, tag2.deltaRecordCount); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare changelogRecordCount - if (tag1.changelogRecordCount != null && tag2.changelogRecordCount != null) { - comparisonResult = - Long.compare(tag1.changelogRecordCount, tag2.changelogRecordCount); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare watermark - if (tag1.watermark != null && tag2.watermark != null) { - comparisonResult = Long.compare(tag1.watermark, tag2.watermark); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - // Compare statistics - if (tag1.statistics != null && tag2.statistics != null) { - comparisonResult = tag1.statistics.compareTo(tag2.statistics); - if (comparisonResult != 0) { - return comparisonResult; - } - } - - return comparisonResult; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 41e1e8d16c0d5..409ceb3dc63dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -20,12 +20,16 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagTimeExtractor.ProcessTimeExtractor; import org.apache.paimon.tag.TagTimeExtractor.WatermarkExtractor; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.time.Duration; @@ -43,12 +47,16 @@ /** A manager to create tags automatically. */ public class TagAutoCreation { + private static final Logger LOG = LoggerFactory.getLogger(TagAutoCreation.class); + private final SnapshotManager snapshotManager; private final TagManager tagManager; + private final TagDeletion tagDeletion; private final TagTimeExtractor timeExtractor; private final TagPeriodHandler periodHandler; private final Duration delay; - private final Duration timeRetained; + @Nullable private final Integer numRetainedMax; + @Nullable private final Duration defaultTimeRetained; private final List callbacks; private final Duration idlenessTimeout; @@ -58,18 +66,22 @@ public class TagAutoCreation { private TagAutoCreation( SnapshotManager snapshotManager, TagManager tagManager, + TagDeletion tagDeletion, TagTimeExtractor timeExtractor, TagPeriodHandler periodHandler, Duration delay, - @Nullable Duration timeRetained, + @Nullable Integer numRetainedMax, + @Nullable Duration defaultTimeRetained, Duration idlenessTimeout, List callbacks) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.tagDeletion = tagDeletion; this.timeExtractor = timeExtractor; this.periodHandler = periodHandler; this.delay = delay; - this.timeRetained = timeRetained; + this.numRetainedMax = numRetainedMax; + this.defaultTimeRetained = defaultTimeRetained; this.callbacks = callbacks; this.idlenessTimeout = idlenessTimeout; @@ -140,8 +152,32 @@ private void tryToCreateTags(Snapshot snapshot) { || isAfterOrEqual(time.minus(delay), periodHandler.nextTagTime(nextTag))) { LocalDateTime thisTag = periodHandler.normalizeToPreviousTag(time); String tagName = periodHandler.timeToTag(thisTag); - tagManager.createTag(snapshot, tagName, timeRetained, callbacks); + tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); nextTag = periodHandler.nextTagTime(thisTag); + + if (numRetainedMax != null) { + // only handle auto-created tags here + SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); + if (tags.size() > numRetainedMax) { + int toDelete = tags.size() - numRetainedMax; + int i = 0; + for (List tag : tags.values()) { + LOG.info( + "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", + tagName, + numRetainedMax); + tagManager.deleteTag( + checkAndGetOneAutoTag(tag), + tagDeletion, + snapshotManager, + callbacks); + i++; + if (i == toDelete) { + break; + } + } + } + } } } @@ -163,6 +199,7 @@ public static TagAutoCreation create( CoreOptions options, SnapshotManager snapshotManager, TagManager tagManager, + TagDeletion tagDeletion, List callbacks) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); if (extractor == null) { @@ -171,9 +208,11 @@ public static TagAutoCreation create( return new TagAutoCreation( snapshotManager, tagManager, + tagDeletion, extractor, TagPeriodHandler.create(options), options.tagCreationDelay(), + options.tagNumRetainedMax(), options.tagDefaultTimeRetained(), options.snapshotWatermarkIdleTimeout(), callbacks); diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java deleted file mode 100644 index e31a926b57105..0000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoExpire.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.tag; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.Snapshot; -import org.apache.paimon.operation.TagDeletion; -import org.apache.paimon.table.sink.TagCallback; -import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.TagManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.time.LocalDateTime; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; - -/** A manager to expire tags. */ -public class TagAutoExpire { - - private static final Logger LOG = LoggerFactory.getLogger(TagAutoExpire.class); - - private final SnapshotManager snapshotManager; - private final TagManager tagManager; - private final TagDeletion tagDeletion; - private final TagPeriodHandler periodHandler; - private final Integer numRetainedMax; - private final List callbacks; - - private TagAutoExpire( - SnapshotManager snapshotManager, - TagManager tagManager, - TagDeletion tagDeletion, - TagPeriodHandler periodHandler, - Duration delay, - Integer numRetainedMax, - List callbacks) { - this.snapshotManager = snapshotManager; - this.tagManager = tagManager; - this.tagDeletion = tagDeletion; - this.periodHandler = periodHandler; - this.numRetainedMax = numRetainedMax; - this.callbacks = callbacks; - this.periodHandler.validateDelay(delay); - } - - public void run() { - Set deleteTags = new HashSet<>(); - deleteTags.addAll(getExpireTagsByNumRetainedMax()); - deleteTags.addAll(getExpireTagsByTimeRetained()); - deleteTags.forEach( - tag -> tagManager.deleteTag(tag, tagDeletion, snapshotManager, callbacks)); - } - - private Set getExpireTagsByNumRetainedMax() { - Set deleteTags = new HashSet<>(); - if (numRetainedMax != null) { - // only handle auto-created tags here - SortedMap> tags = tagManager.tags(periodHandler::isAutoTag); - if (tags.size() > numRetainedMax) { - int toDelete = tags.size() - numRetainedMax; - int i = 0; - for (List tag : tags.values()) { - String tagName = TagAutoCreation.checkAndGetOneAutoTag(tag); - LOG.info( - "Delete tag {}, because the number of auto-created tags reached numRetainedMax of {}.", - tagName, - numRetainedMax); - deleteTags.add(tagName); - i++; - if (i == toDelete) { - break; - } - } - } - } - return deleteTags; - } - - private Set getExpireTagsByTimeRetained() { - // handle auto-created and non-auto-created-tags here - Set deleteTags = new HashSet<>(); - SortedMap> tags = tagManager.tagsWithTimeRetained(); - for (Map.Entry> entry : tags.entrySet()) { - Tag tag = entry.getKey(); - LocalDateTime createTime = tag.getTagCreateTime(); - Duration timeRetained = tag.getTagTimeRetained(); - if (createTime == null || timeRetained == null) { - continue; - } - if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { - for (String tagName : entry.getValue()) { - LOG.info( - "Delete tag {}, because its existence time has reached its timeRetained of {}.", - tagName, - timeRetained); - deleteTags.add(tagName); - } - } - } - return deleteTags; - } - - public static TagAutoExpire create( - CoreOptions options, - SnapshotManager snapshotManager, - TagManager tagManager, - TagDeletion tagDeletion, - List callbacks) { - return new TagAutoExpire( - snapshotManager, - tagManager, - tagDeletion, - TagPeriodHandler.create(options), - options.tagCreationDelay(), - options.tagNumRetainedMax(), - callbacks); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java index a991559479a9f..387a3e746adcd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoManager.java @@ -30,19 +30,19 @@ public class TagAutoManager { private final TagAutoCreation tagAutoCreation; - private final TagAutoExpire tagAutoExpire; + private final TagTimeExpire tagTimeExpire; - private TagAutoManager(TagAutoCreation tagAutoCreation, TagAutoExpire tagAutoExpire) { + private TagAutoManager(TagAutoCreation tagAutoCreation, TagTimeExpire tagTimeExpire) { this.tagAutoCreation = tagAutoCreation; - this.tagAutoExpire = tagAutoExpire; + this.tagTimeExpire = tagTimeExpire; } public void run() { if (tagAutoCreation != null) { tagAutoCreation.run(); } - if (tagAutoExpire != null) { - tagAutoExpire.run(); + if (tagTimeExpire != null) { + tagTimeExpire.run(); } } @@ -57,8 +57,9 @@ public static TagAutoManager create( return new TagAutoManager( extractor == null ? null - : TagAutoCreation.create(options, snapshotManager, tagManager, callbacks), - TagAutoExpire.create(options, snapshotManager, tagManager, tagDeletion, callbacks)); + : TagAutoCreation.create( + options, snapshotManager, tagManager, tagDeletion, callbacks), + TagTimeExpire.create(snapshotManager, tagManager, tagDeletion, callbacks)); } public TagAutoCreation getTagAutoCreation() { diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java new file mode 100644 index 0000000000000..d4797c0cb0568 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExpire.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +import org.apache.paimon.operation.TagDeletion; +import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; + +/** A manager to expire tags by time. */ +public class TagTimeExpire { + + private static final Logger LOG = LoggerFactory.getLogger(TagTimeExpire.class); + + private final SnapshotManager snapshotManager; + private final TagManager tagManager; + private final TagDeletion tagDeletion; + private final List callbacks; + + private TagTimeExpire( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.tagDeletion = tagDeletion; + this.callbacks = callbacks; + } + + public void run() { + List> tags = tagManager.tagObjects(); + for (Pair pair : tags) { + Tag tag = pair.getLeft(); + String tagName = pair.getRight(); + LocalDateTime createTime = tag.getTagCreateTime(); + Duration timeRetained = tag.getTagTimeRetained(); + if (createTime == null || timeRetained == null) { + continue; + } + if (LocalDateTime.now().isAfter(createTime.plus(timeRetained))) { + LOG.info( + "Delete tag {}, because its existence time has reached its timeRetained of {}.", + tagName, + timeRetained); + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, callbacks); + } + } + } + + public static TagTimeExpire create( + SnapshotManager snapshotManager, + TagManager tagManager, + TagDeletion tagDeletion, + List callbacks) { + return new TagTimeExpire(snapshotManager, tagManager, tagDeletion, callbacks); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 174b4233ccccc..dbbc8fffdc050 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -394,7 +394,10 @@ public List safelyGetAllSnapshots() throws IOException { List snapshots = new ArrayList<>(); for (Path path : paths) { - Snapshot.safelyFromPath(fileIO, path).ifPresent(snapshots::add); + Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); + if (snapshot != null) { + snapshots.add(snapshot); + } } return snapshots; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index ce6f0202093d6..8b7818fed782e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -242,7 +242,8 @@ public boolean tagExists(String tagName) { /** Get the tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Tag.fromPath(fileIO, tagPath(tagName)).toSnapshot(); + // Trim to snapshot to avoid equals and compare snapshot. + return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); } public long tagCount() { @@ -258,41 +259,25 @@ public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } - /** Get all tag sorted by Tag. */ - public SortedMap> tagsWithTimeRetained() { - return tagsWithFilter(tagName -> true); - } - /** Get all tagged snapshots with tag names sorted by snapshot id. */ public SortedMap> tags() { return tags(tagName -> true); } - public SortedMap> tags(Predicate filter) { - SortedMap> sortedTagMap = - new TreeMap<>(Comparator.comparingLong(Snapshot::id)); - SortedMap> tags = tagsWithFilter(filter); - tags.forEach( - (key, value) -> - sortedTagMap - .computeIfAbsent(key.toSnapshot(), tagNames -> new ArrayList<>()) - .addAll(value)); - return sortedTagMap; - } - /** - * Retrieves a sorted map of tags filtered based on a provided predicate. The predicate - * determines which tag names should be included in the result. Only tags with tag names that - * pass the predicate test are included. + * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate + * determines which tag names should be included in the result. Only snapshots with tag names + * that pass the predicate test are included. * - * @param filter A Predicate that tests each tag name. Tags with tag names that fail the test - * are excluded from the result. - * @return A sorted map of filtered tags keyed by Tag.TAG_COMPARATOR, each associated with its - * tag name. - * @throws RuntimeException if an IOException occurs during retrieval of tags. + * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the + * test are excluded from the result. + * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag + * name. + * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ - public SortedMap> tagsWithFilter(Predicate filter) { - TreeMap> tags = new TreeMap<>(Tag.TAG_COMPARATOR); + public SortedMap> tags(Predicate filter) { + TreeMap> tags = + new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { List paths = listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) @@ -307,11 +292,10 @@ public SortedMap> tagsWithFilter(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Tag.safelyFromTagPath(fileIO, path) - .ifPresent( - tag -> - tags.computeIfAbsent(tag, s -> new ArrayList<>()) - .add(tagName)); + Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); + if (snapshot != null) { + tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } } } catch (IOException e) { throw new RuntimeException(e); @@ -319,6 +303,27 @@ public SortedMap> tagsWithFilter(Predicate filter) { return tags; } + /** Get all {@link Tag}s. */ + public List> tagObjects() { + try { + List paths = + listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + .map(FileStatus::getPath) + .collect(Collectors.toList()); + List> tags = new ArrayList<>(); + for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); + Tag tag = Tag.safelyFromPath(fileIO, path); + if (tag != null) { + tags.add(Pair.of(tag, tagName)); + } + } + return tags; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public List sortTagsOfOneSnapshot(List tagNames) { return tagNames.stream() .map( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java index b0d357853b6eb..4b381a6011a03 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/TagsTableTest.java @@ -32,6 +32,7 @@ import org.apache.paimon.tag.Tag; import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.TagManager; import org.junit.jupiter.api.BeforeEach; @@ -42,7 +43,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.function.Function; import static org.assertj.core.api.Assertions.assertThat; @@ -120,28 +120,26 @@ void testTagBranchesTable() throws Exception { private List getExceptedResult( Function> tagBranchesFunction) { List internalRows = new ArrayList<>(); - for (Map.Entry> snapshot : tagManager.tagsWithTimeRetained().entrySet()) { + for (Pair snapshot : tagManager.tagObjects()) { Tag tag = snapshot.getKey(); - for (String tagName : snapshot.getValue()) { - internalRows.add( - GenericRow.of( - BinaryString.fromString(tagName), - tag.id(), - tag.schemaId(), - Timestamp.fromLocalDateTime( - DateTimeUtils.toLocalDateTime(tag.timeMillis())), - tag.totalRecordCount(), - BinaryString.fromString( - tagBranchesFunction.apply(tagName).toString()), - Timestamp.fromLocalDateTime( - tag.getTagCreateTime() == null - ? LocalDateTime.MIN - : tag.getTagCreateTime()), - BinaryString.fromString( - tag.getTagTimeRetained() == null - ? "" - : tag.getTagTimeRetained().toString()))); - } + String tagName = snapshot.getValue(); + internalRows.add( + GenericRow.of( + BinaryString.fromString(tagName), + tag.id(), + tag.schemaId(), + Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(tag.timeMillis())), + tag.totalRecordCount(), + BinaryString.fromString(tagBranchesFunction.apply(tagName).toString()), + Timestamp.fromLocalDateTime( + tag.getTagCreateTime() == null + ? LocalDateTime.MIN + : tag.getTagCreateTime()), + BinaryString.fromString( + tag.getTagTimeRetained() == null + ? "" + : tag.getTagTimeRetained().toString()))); } return internalRows; } diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java index 922055983b044..3198366dd3e54 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagTest.java @@ -25,8 +25,6 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; /** Test for {@link Tag}. */ public class TagTest { @@ -101,81 +99,4 @@ public void testFromSnapshotAndTagTtl() { Tag newTag = Tag.fromJson(tagJson); Assert.assertEquals(tag, newTag); } - - @Test - public void testTagComparator() { - Tag tag1 = - new Tag( - 3, - 2L, - 0, - null, - null, - null, - null, - null, - 8, - Snapshot.CommitKind.APPEND, - 1000, - null, - null, - null, - null, - null, - null, - LocalDateTime.now(), - Duration.ofSeconds(10)); - - Tag tag2 = - new Tag( - 3, - 1L, - 0, - null, - null, - null, - null, - null, - 8, - Snapshot.CommitKind.APPEND, - 1000, - null, - null, - null, - null, - null, - null, - LocalDateTime.now(), - Duration.ofSeconds(10)); - - Tag tag3 = - new Tag( - 3, - 0L, - 0, - null, - null, - null, - null, - null, - 8, - Snapshot.CommitKind.APPEND, - 1000, - null, - null, - null, - null, - null, - null, - LocalDateTime.now(), - Duration.ofSeconds(10)); - List tags = new ArrayList<>(); - tags.add(tag1); - tags.add(tag2); - tags.add(tag3); - tags.sort(Tag.TAG_COMPARATOR); - Assert.assertEquals(0, tags.get(0).id()); - Assert.assertEquals(1, tags.get(1).id()); - Assert.assertEquals(2, tags.get(2).id()); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java index 12f38931ded18..3e702b9b2cd0c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.SortedMap; import java.util.concurrent.ThreadLocalRandom; import static org.apache.paimon.operation.FileStoreTestUtils.commitData; @@ -122,14 +121,14 @@ public void testCreateTagWithTimeRetained() throws Exception { tagManager.createTag( snapshotManager.snapshot(1), "tag", Duration.ofDays(1), Collections.emptyList()); assertThat(tagManager.tagExists("tag")).isTrue(); - SortedMap> tagsWithTimeRetained = tagManager.tagsWithTimeRetained(); - Assertions.assertEquals(1, tagsWithTimeRetained.size()); - Tag tag = tagsWithTimeRetained.firstKey(); + List> tags = tagManager.tagObjects(); + Assertions.assertEquals(1, tags.size()); + Tag tag = tags.get(0).getKey(); String tagJson = tag.toJson(); Assertions.assertTrue( tagJson.contains("tagCreateTime") && tagJson.contains("tagTimeRetained")); Assertions.assertEquals(tag, Tag.fromJson(tagJson)); - assertThat(tagsWithTimeRetained.get(tag)).contains("tag"); + assertThat(tags.get(0).getValue()).contains("tag"); } private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode mode, int buckets) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java index f1455d3774e4d..cfc9b558b40c3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java @@ -27,8 +27,8 @@ public class CreateTagAction extends TableActionBase { private final String tagName; - private final Duration timeRetained; - private final Long snapshotId; + private final @Nullable Long snapshotId; + private final @Nullable Duration timeRetained; public CreateTagAction( String warehouse, @@ -36,8 +36,8 @@ public CreateTagAction( String tableName, Map catalogConfig, String tagName, - @Nullable Duration timeRetained, - Long snapshotId) { + @Nullable Long snapshotId, + @Nullable Duration timeRetained) { super(warehouse, databaseName, tableName, catalogConfig); this.tagName = tagName; this.timeRetained = timeRetained; @@ -49,7 +49,7 @@ public void run() throws Exception { if (snapshotId == null) { table.createTag(tagName, timeRetained); } else { - table.createTag(tagName, timeRetained, snapshotId); + table.createTag(tagName, snapshotId, timeRetained); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java index 6cca76cb141ef..7769fa1d792f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagActionFactory.java @@ -65,8 +65,8 @@ public Optional create(MultipleParameterToolAdapter params) { tablePath.f2, catalogConfig, tagName, - timeRetained, - snapshot); + snapshot, + timeRetained); return Optional.of(action); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java index 3bdc2e07edc30..1a7b03ef65127 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java @@ -27,45 +27,70 @@ import javax.annotation.Nullable; +import java.time.Duration; + /** * Create tag procedure. Usage: * *

- *  CALL sys.create_tag('tableId', 'tagName', 'timeRetained', snapshotId)
+ *  CALL sys.create_tag('tableId', 'tagName', snapshotId, 'timeRetained')
  * 
*/ public class CreateTagProcedure extends ProcedureBase { public static final String IDENTIFIER = "create_tag"; + public String[] call( + ProcedureContext procedureContext, String tableId, String tagName, long snapshotId) + throws Catalog.TableNotExistException { + return innerCall(tableId, tagName, snapshotId, null); + } + + public String[] call(ProcedureContext procedureContext, String tableId, String tagName) + throws Catalog.TableNotExistException { + return innerCall(tableId, tagName, null, null); + } + public String[] call( ProcedureContext procedureContext, String tableId, String tagName, - String timeRetained, - long snapshotId) + long snapshotId, + String timeRetained) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, timeRetained, snapshotId); + return innerCall(tableId, tagName, snapshotId, timeRetained); } public String[] call( ProcedureContext procedureContext, String tableId, String tagName, String timeRetained) throws Catalog.TableNotExistException { - return innerCall(tableId, tagName, timeRetained, null); + return innerCall(tableId, tagName, null, timeRetained); } private String[] innerCall( - String tableId, String tagName, String timeRetained, @Nullable Long snapshotId) + String tableId, + String tagName, + @Nullable Long snapshotId, + @Nullable String timeRetained) throws Catalog.TableNotExistException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (snapshotId == null) { - table.createTag(tagName, TimeUtils.parseDuration(timeRetained)); + table.createTag(tagName, toDuration(timeRetained)); } else { - table.createTag(tagName, TimeUtils.parseDuration(timeRetained), snapshotId); + table.createTag(tagName, snapshotId, toDuration(timeRetained)); } return new String[] {"Success"}; } + @Nullable + private static Duration toDuration(@Nullable String s) { + if (s == null) { + return null; + } + + return TimeUtils.parseDuration(s); + } + @Override public String identifier() { return IDENTIFIER; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index eb6f70f655d30..7a6361657f599 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -65,7 +65,7 @@ void testCreateAndDeleteBranch() throws Exception { TagManager tagManager = new TagManager(table.fileIO(), table.location()); callProcedure( String.format( - "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); + "CALL sys.create_tag('%s.%s', 'tag2', 2, '5 d')", database, tableName)); assertThat(tagManager.tagExists("tag2")).isTrue(); BranchManager branchManager = table.branchManager(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java index a650ad31b3478..1fda583d2de31 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java @@ -83,8 +83,7 @@ public void testCreateAndDeleteTag() throws Exception { .run(); } else { callProcedure( - String.format( - "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); @@ -154,8 +153,7 @@ public void testCreateLatestTag() throws Exception { .run(); } else { callProcedure( - String.format( - "CALL sys.create_tag('%s.%s', 'tag2', '5 d', 2)", database, tableName)); + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); } assertThat(tagManager.tagExists("tag2")).isTrue(); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java index 38d181bac48f9..b3f863c5e305a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateTagProcedure.java @@ -40,8 +40,8 @@ public class CreateTagProcedure extends BaseProcedure { new ProcedureParameter[] { ProcedureParameter.required("table", StringType), ProcedureParameter.required("tag", StringType), - ProcedureParameter.optional("time_retained", StringType), - ProcedureParameter.optional("snapshot", LongType) + ProcedureParameter.optional("snapshot", LongType), + ProcedureParameter.optional("time_retained", StringType) }; private static final StructType OUTPUT_TYPE = @@ -68,9 +68,9 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String tag = args.getString(1); + Long snapshot = args.isNullAt(2) ? null : args.getLong(2); Duration timeRetained = - args.isNullAt(2) ? null : TimeUtils.parseDuration(args.getString(2)); - Long snapshot = args.isNullAt(3) ? null : args.getLong(3); + args.isNullAt(3) ? null : TimeUtils.parseDuration(args.getString(3)); return modifyPaimonTable( tableIdent, @@ -78,7 +78,7 @@ public InternalRow[] call(InternalRow args) { if (snapshot == null) { table.createTag(tag, timeRetained); } else { - table.createTag(tag, timeRetained, snapshot); + table.createTag(tag, snapshot, timeRetained); } InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow};