diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index bf97aa3f10b6..c4e9f794880a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -67,8 +67,7 @@ private TagAutoCreation( this.periodHandler.validateDelay(delay); - SortedMap tags = tagManager.tags(); - tags.entrySet().removeIf(entry -> entry.getValue().startsWith("savepoint")); + SortedMap tags = tagManager.tags(t -> !t.startsWith("savepoint")); if (tags.isEmpty()) { this.nextSnapshot = @@ -123,7 +122,7 @@ private void tryToTag(Snapshot snapshot) { nextTag = periodHandler.nextTagTime(thisTag); if (numRetainedMax != null) { - SortedMap tags = tagManager.tags(); + SortedMap tags = tagManager.tags(t -> !t.startsWith("savepoint")); if (tags.size() > numRetainedMax) { int toDelete = tags.size() - numRetainedMax; int i = 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 927dbd373520..bd526ee2b0a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -179,6 +179,21 @@ public List taggedSnapshots() { /** Get all tagged snapshots with names sorted by snapshot id. */ public SortedMap tags() { + return tags(tagName -> true); + } + + /** + * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate + * determines which tag names should be included in the result. Only snapshots with tag names + * that pass the predicate test are included. + * + * @param filter A Predicate that tests each tag name. Snapshots with tag names that fail the + * test are excluded from the result. + * @return A sorted map of filtered snapshots keyed by their IDs, each associated with its tag + * name. + * @throws RuntimeException if an IOException occurs during retrieval of snapshots. + */ + public SortedMap tags(Predicate filter) { TreeMap tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { List paths = @@ -187,14 +202,15 @@ public SortedMap tags() { .collect(Collectors.toList()); for (Path path : paths) { + String tagName = path.getName().substring(TAG_PREFIX.length()); + + if (!filter.test(tagName)) { + continue; + } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag Snapshot.safelyFromPath(fileIO, path) - .ifPresent( - snapshot -> - tags.put( - snapshot, - path.getName().substring(TAG_PREFIX.length()))); + .ifPresent(snapshot -> tags.put(snapshot, tagName)); } } catch (IOException e) { throw new RuntimeException(e); diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java index 53dd6aed28a4..3b753093b857 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoCreationTest.java @@ -215,6 +215,37 @@ public void testModifyTagPeriod() { assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-19"); } + @Test + public void testSavepointTag() { + Options options = new Options(); + options.set(TAG_AUTOMATIC_CREATION, TagCreationMode.WATERMARK); + options.set(TAG_CREATION_PERIOD, TagCreationPeriod.HOURLY); + options.set(TAG_NUM_RETAINED_MAX, 3); + FileStoreTable table; + TableCommitImpl commit; + TagManager tagManager; + table = this.table.copy(options.toMap()); + + commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + tagManager = table.store().newTagManager(); + + // test normal creation + commit.commit(new ManifestCommittable(0, utcMills("2023-07-18T12:12:00"))); + assertThat(tagManager.tags().values()).containsOnly("2023-07-18 11"); + + table.createTag("savepoint-11", 1); + + // test newCommit create + commit.commit(new ManifestCommittable(1, utcMills("2023-07-18T14:00:00"))); + assertThat(tagManager.tags().values()).contains("2023-07-18 11", "2023-07-18 13"); + + // test expire old tag + commit.commit(new ManifestCommittable(2, utcMills("2023-07-18T15:00:00"))); + commit.commit(new ManifestCommittable(3, utcMills("2023-07-18T16:00:00"))); + assertThat(tagManager.tags().values()) + .containsOnly("savepoint-11", "2023-07-18 13", "2023-07-18 14", "2023-07-18 15"); + } + private long localZoneMills(String timestamp) { return LocalDateTime.parse(timestamp) .atZone(ZoneId.systemDefault())